1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-24 16:19:12 -05:00

feat: add experimental WebSocketStream API (#10365)

This commit adds the experimental WebSocketStream API when
using the --unstable flag.

The explainer for the API can be found here:
https://github.com/ricea/websocketstream-explainer
This commit is contained in:
Leo K 2021-08-10 00:28:17 +02:00 committed by GitHub
parent 7600a456df
commit 2db381eba9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 682 additions and 12 deletions

View file

@ -1152,3 +1152,28 @@ declare interface WorkerOptions {
};
};
}
declare interface WebSocketStreamOptions {
protocols?: string[];
signal?: AbortSignal;
}
declare interface WebSocketConnection {
readable: ReadableStream<string | Uint8Array>;
writable: WritableStream<string | Uint8Array>;
extensions: string;
protocol: string;
}
declare interface WebSocketCloseInfo {
code?: number;
reason?: string;
}
declare class WebSocketStream {
constructor(url: string, options?: WebSocketStreamOptions);
url: string;
connection: Promise<WebSocketConnection>;
closed: Promise<WebSocketCloseInfo>;
close(closeInfo?: WebSocketCloseInfo): void;
}

View file

@ -643,6 +643,27 @@ fn websocket() {
assert!(status.success());
}
#[test]
fn websocketstream() {
let _g = util::http_server();
let script = util::tests_path().join("websocketstream_test.ts");
let root_ca = util::tests_path().join("tls/RootCA.pem");
let status = util::deno_cmd()
.arg("test")
.arg("--unstable")
.arg("--allow-net")
.arg("--cert")
.arg(root_ca)
.arg(script)
.spawn()
.unwrap()
.wait()
.unwrap();
assert!(status.success());
}
#[cfg(not(windows))]
#[test]
fn set_raw_should_not_panic_on_no_tty() {

View file

@ -0,0 +1,82 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
import {
assertEquals,
assertThrows,
assertThrowsAsync,
} from "../../test_util/std/testing/asserts.ts";
Deno.test("fragment", () => {
assertThrows(() => new WebSocketStream("ws://localhost:4242/#"));
assertThrows(() => new WebSocketStream("ws://localhost:4242/#foo"));
});
Deno.test("duplicate protocols", () => {
assertThrows(() =>
new WebSocketStream("ws://localhost:4242", {
protocols: ["foo", "foo"],
})
);
});
Deno.test("connect & close custom valid code", async () => {
const ws = new WebSocketStream("ws://localhost:4242");
await ws.connection;
ws.close({ code: 1000 });
await ws.closed;
});
Deno.test("connect & close custom invalid reason", async () => {
const ws = new WebSocketStream("ws://localhost:4242");
await ws.connection;
assertThrows(() => ws.close({ code: 1000, reason: "".padEnd(124, "o") }));
ws.close();
await ws.closed;
});
Deno.test("echo string", async () => {
const ws = new WebSocketStream("ws://localhost:4242");
const { readable, writable } = await ws.connection;
await writable.getWriter().write("foo");
const res = await readable.getReader().read();
assertEquals(res.value, "foo");
ws.close();
await ws.closed;
});
Deno.test("echo string tls", async () => {
const ws = new WebSocketStream("wss://localhost:4243");
const { readable, writable } = await ws.connection;
await writable.getWriter().write("foo");
const res = await readable.getReader().read();
assertEquals(res.value, "foo");
ws.close();
await ws.closed;
});
Deno.test("websocket error", async () => {
const ws = new WebSocketStream("wss://localhost:4242");
await Promise.all([
assertThrowsAsync(
() => ws.connection,
Deno.errors.UnexpectedEof,
"tls handshake eof",
),
assertThrowsAsync(
() => ws.closed,
Deno.errors.UnexpectedEof,
"tls handshake eof",
),
]);
});
Deno.test("echo uint8array", async () => {
const ws = new WebSocketStream("ws://localhost:4242");
const { readable, writable } = await ws.connection;
const uint = new Uint8Array([102, 111, 111]);
await writable.getWriter().write(uint);
const res = await readable.getReader().read();
assertEquals(res.value, uint);
ws.close();
await ws.closed;
});

View file

@ -4455,6 +4455,8 @@
isReadableStreamDisturbed,
errorReadableStream,
createProxy,
writableStreamClose,
Deferred,
// Exposed in global runtime scope
ByteLengthQueuingStrategy,
CountQueuingStrategy,

View file

@ -234,7 +234,11 @@
this[_url] = wsURL.href;
core.opSync("op_ws_check_permission", this[_url]);
core.opSync(
"op_ws_check_permission_and_cancel_handle",
this[_url],
false,
);
if (typeof protocols === "string") {
protocols = [protocols];

View file

@ -0,0 +1,412 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
"use strict";
/// <reference path="../../core/internal.d.ts" />
((window) => {
const core = window.Deno.core;
const webidl = window.__bootstrap.webidl;
const { writableStreamClose, Deferred } = window.__bootstrap.streams;
const { DOMException } = window.__bootstrap.domException;
const { add, remove } = window.__bootstrap.abortSignal;
const {
StringPrototypeEndsWith,
StringPrototypeToLowerCase,
Symbol,
SymbolFor,
Set,
ArrayPrototypeMap,
ArrayPrototypeJoin,
PromisePrototypeThen,
PromisePrototypeCatch,
Uint8Array,
TypeError,
} = window.__bootstrap.primordials;
webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter(
"WebSocketStreamOptions",
[
{
key: "protocols",
converter: webidl.converters["sequence<USVString>"],
get defaultValue() {
return [];
},
},
{
key: "signal",
converter: webidl.converters.AbortSignal,
},
],
);
webidl.converters.WebSocketCloseInfo = webidl.createDictionaryConverter(
"WebSocketCloseInfo",
[
{
key: "code",
converter: webidl.converters["unsigned short"],
},
{
key: "reason",
converter: webidl.converters.USVString,
defaultValue: "",
},
],
);
/**
* Tries to close the resource (and ignores BadResource errors).
* @param {number} rid
*/
function tryClose(rid) {
try {
core.close(rid);
} catch (err) {
// Ignore error if the socket has already been closed.
if (!(err instanceof Deno.errors.BadResource)) throw err;
}
}
const _rid = Symbol("[[rid]]");
const _url = Symbol("[[url]]");
const _connection = Symbol("[[connection]]");
const _closed = Symbol("[[closed]]");
const _closing = Symbol("[[closing]]");
const _earlyClose = Symbol("[[earlyClose]]");
class WebSocketStream {
[_rid];
[_url];
get url() {
webidl.assertBranded(this, WebSocketStream);
return this[_url];
}
constructor(url, options) {
this[webidl.brand] = webidl.brand;
const prefix = "Failed to construct 'WebSocketStream'";
webidl.requiredArguments(arguments.length, 1, { prefix });
url = webidl.converters.USVString(url, {
prefix,
context: "Argument 1",
});
options = webidl.converters.WebSocketStreamOptions(options, {
prefix,
context: "Argument 2",
});
const wsURL = new URL(url);
if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") {
throw new DOMException(
"Only ws & wss schemes are allowed in a WebSocket URL.",
"SyntaxError",
);
}
if (wsURL.hash !== "" || StringPrototypeEndsWith(wsURL.href, "#")) {
throw new DOMException(
"Fragments are not allowed in a WebSocket URL.",
"SyntaxError",
);
}
this[_url] = wsURL.href;
if (
options.protocols.length !==
new Set(
ArrayPrototypeMap(
options.protocols,
(p) => StringPrototypeToLowerCase(p),
),
).size
) {
throw new DOMException(
"Can't supply multiple times the same protocol.",
"SyntaxError",
);
}
const cancelRid = core.opSync(
"op_ws_check_permission_and_cancel_handle",
this[_url],
true,
);
if (options.signal?.aborted) {
core.close(cancelRid);
const err = new DOMException(
"This operation was aborted",
"AbortError",
);
this[_connection].reject(err);
this[_closed].reject(err);
} else {
const abort = () => {
core.close(cancelRid);
};
options.signal?.[add](abort);
PromisePrototypeThen(
core.opAsync("op_ws_create", {
url: this[_url],
protocols: options.protocols
? ArrayPrototypeJoin(options.protocols, ", ")
: "",
cancelHandle: cancelRid,
}),
(create) => {
options.signal?.[remove](abort);
if (this[_earlyClose]) {
PromisePrototypeThen(
core.opAsync("op_ws_close", {
rid: create.rid,
}),
() => {
PromisePrototypeThen(
(async () => {
while (true) {
const { kind } = await core.opAsync(
"op_ws_next_event",
create.rid,
);
if (kind === "close") {
break;
}
}
})(),
() => {
const err = new DOMException(
"Closed while connecting",
"NetworkError",
);
this[_connection].reject(err);
this[_closed].reject(err);
},
);
},
() => {
const err = new DOMException(
"Closed while connecting",
"NetworkError",
);
this[_connection].reject(err);
this[_closed].reject(err);
},
);
} else {
this[_rid] = create.rid;
const writable = new WritableStream({
write: async (chunk) => {
if (typeof chunk === "string") {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "text",
text: chunk,
});
} else if (chunk instanceof Uint8Array) {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "binary",
}, chunk);
} else {
throw new TypeError(
"A chunk may only be either a string or an Uint8Array",
);
}
},
close: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
this.close();
}
await this.closed;
},
abort: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
this.close();
}
await this.closed;
},
});
const readable = new ReadableStream({
start: (controller) => {
PromisePrototypeThen(this.closed, () => {
try {
controller.close();
} catch (_) {
// needed to ignore warnings & assertions
}
try {
PromisePrototypeCatch(
writableStreamClose(writable),
() => {},
);
} catch (_) {
// needed to ignore warnings & assertions
}
});
},
pull: async (controller) => {
const { kind, value } = await core.opAsync(
"op_ws_next_event",
this[_rid],
);
switch (kind) {
case "string": {
controller.enqueue(value);
break;
}
case "binary": {
controller.enqueue(value);
break;
}
case "ping": {
await core.opAsync("op_ws_send", {
rid: this[_rid],
kind: "pong",
});
break;
}
case "close": {
if (this[_closing]) {
this[_closed].resolve(value);
tryClose(this[_rid]);
} else {
PromisePrototypeThen(
core.opAsync("op_ws_close", {
rid: this[_rid],
...value,
}),
() => {
this[_closed].resolve(value);
tryClose(this[_rid]);
},
(err) => {
this[_closed].reject(err);
controller.error(err);
tryClose(this[_rid]);
},
);
}
break;
}
case "error": {
const err = new Error(value);
this[_closed].reject(err);
controller.error(err);
tryClose(this[_rid]);
break;
}
}
},
cancel: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});
} catch (_) {
this.close();
}
await this.closed;
},
});
this[_connection].resolve({
readable,
writable,
extensions: create.extensions ?? "",
protocol: create.protocol ?? "",
});
}
},
(err) => {
tryClose(cancelRid);
this[_connection].reject(err);
this[_closed].reject(err);
},
);
}
}
[_connection] = new Deferred();
get connection() {
webidl.assertBranded(this, WebSocketStream);
return this[_connection].promise;
}
[_earlyClose] = false;
[_closing] = false;
[_closed] = new Deferred();
get closed() {
webidl.assertBranded(this, WebSocketStream);
return this[_closed].promise;
}
close(closeInfo) {
webidl.assertBranded(this, WebSocketStream);
closeInfo = webidl.converters.WebSocketCloseInfo(closeInfo, {
prefix: "Failed to execute 'close' on 'WebSocketStream'",
context: "Argument 1",
});
if (
closeInfo.code &&
!(closeInfo.code === 1000 ||
(3000 <= closeInfo.code && closeInfo.code < 5000))
) {
throw new DOMException(
"The close code must be either 1000 or in the range of 3000 to 4999.",
"InvalidAccessError",
);
}
const encoder = new TextEncoder();
if (
closeInfo.reason && encoder.encode(closeInfo.reason).byteLength > 123
) {
throw new DOMException(
"The close reason may not be longer than 123 bytes.",
"SyntaxError",
);
}
let code = closeInfo.code;
if (closeInfo.reason && code === undefined) {
code = 1000;
}
if (this[_connection].state === "pending") {
this[_earlyClose] = true;
} else if (this[_closed].state === "pending") {
this[_closing] = true;
PromisePrototypeCatch(
core.opAsync("op_ws_close", {
rid: this[_rid],
code,
reason: closeInfo.reason,
}),
(err) => {
this[_rid] && tryClose(this[_rid]);
this[_closed].reject(err);
},
);
}
}
[SymbolFor("Deno.customInspect")](inspect) {
return `${this.constructor.name} ${
inspect({
url: this.url,
})
}`;
}
}
window.__bootstrap.webSocket.WebSocketStream = WebSocketStream;
})(this);

View file

@ -3,7 +3,6 @@
use deno_core::error::bad_resource_id;
use deno_core::error::invalid_hostname;
use deno_core::error::null_opbuf;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::stream::SplitSink;
use deno_core::futures::stream::SplitStream;
@ -30,6 +29,7 @@ use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
use std::fmt;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
@ -153,14 +153,26 @@ impl Resource for WsStreamResource {
}
}
pub struct WsCancelResource(Rc<CancelHandle>);
impl Resource for WsCancelResource {
fn name(&self) -> Cow<str> {
"webSocketCancel".into()
}
fn close(self: Rc<Self>) {
self.0.cancel()
}
}
// This op is needed because creating a WS instance in JavaScript is a sync
// operation and should throw error when permissions are not fulfilled,
// but actual op that connects WS is async.
pub fn op_ws_check_permission<WP>(
pub fn op_ws_check_permission_and_cancel_handle<WP>(
state: &mut OpState,
url: String,
_: (),
) -> Result<(), AnyError>
cancel_handle: bool,
) -> Result<Option<ResourceId>, AnyError>
where
WP: WebSocketPermissions + 'static,
{
@ -168,7 +180,14 @@ where
.borrow_mut::<WP>()
.check_net_url(&url::Url::parse(&url)?)?;
Ok(())
if cancel_handle {
let rid = state
.resource_table
.add(WsCancelResource(CancelHandle::new_rc()));
Ok(Some(rid))
} else {
Ok(None)
}
}
#[derive(Deserialize)]
@ -176,6 +195,7 @@ where
pub struct CreateArgs {
url: String,
protocols: String,
cancel_handle: Option<ResourceId>,
}
#[derive(Serialize)]
@ -246,14 +266,32 @@ where
_ => unreachable!(),
};
let client = client_async(request, socket);
let (stream, response): (WsStream, Response) =
client_async(request, socket).await.map_err(|err| {
type_error(format!(
if let Some(cancel_rid) = args.cancel_handle {
let r = state
.borrow_mut()
.resource_table
.get::<WsCancelResource>(cancel_rid)
.ok_or_else(bad_resource_id)?;
client
.or_cancel(r.0.to_owned())
.await
.map_err(|_| DomExceptionAbortError::new("connection was aborted"))?
} else {
client.await
}
.map_err(|err| {
DomExceptionNetworkError::new(&format!(
"failed to connect to WebSocket: {}",
err.to_string()
))
})?;
if let Some(cancel_rid) = args.cancel_handle {
state.borrow_mut().resource_table.close(cancel_rid);
}
let (ws_tx, ws_rx) = stream.split();
let resource = WsStreamResource {
stream: WebSocketStreamType::Client {
@ -398,11 +436,12 @@ pub fn init<P: WebSocketPermissions + 'static>(
.js(include_js_files!(
prefix "deno:extensions/websocket",
"01_websocket.js",
"02_websocketstream.js",
))
.ops(vec![
(
"op_ws_check_permission",
op_sync(op_ws_check_permission::<P>),
"op_ws_check_permission_and_cancel_handle",
op_sync(op_ws_check_permission_and_cancel_handle::<P>),
),
("op_ws_create", op_async(op_ws_create::<P>)),
("op_ws_send", op_async(op_ws_send)),
@ -423,3 +462,55 @@ pub fn init<P: WebSocketPermissions + 'static>(
pub fn get_declaration() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_websocket.d.ts")
}
#[derive(Debug)]
pub struct DomExceptionNetworkError {
pub msg: String,
}
impl DomExceptionNetworkError {
pub fn new(msg: &str) -> Self {
DomExceptionNetworkError {
msg: msg.to_string(),
}
}
}
impl fmt::Display for DomExceptionNetworkError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad(&self.msg)
}
}
impl std::error::Error for DomExceptionNetworkError {}
pub fn get_network_error_class_name(e: &AnyError) -> Option<&'static str> {
e.downcast_ref::<DomExceptionNetworkError>()
.map(|_| "DOMExceptionNetworkError")
}
#[derive(Debug)]
pub struct DomExceptionAbortError {
pub msg: String,
}
impl DomExceptionAbortError {
pub fn new(msg: &str) -> Self {
DomExceptionAbortError {
msg: msg.to_string(),
}
}
}
impl fmt::Display for DomExceptionAbortError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad(&self.msg)
}
}
impl std::error::Error for DomExceptionAbortError {}
pub fn get_abort_error_class_name(e: &AnyError) -> Option<&'static str> {
e.downcast_ref::<DomExceptionAbortError>()
.map(|_| "DOMExceptionAbortError")
}

View file

@ -159,6 +159,8 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
.or_else(|| deno_webgpu::error::get_error_class_name(e))
.or_else(|| deno_web::get_error_class_name(e))
.or_else(|| deno_webstorage::get_not_supported_error_class_name(e))
.or_else(|| deno_websocket::get_network_error_class_name(e))
.or_else(|| deno_websocket::get_abort_error_class_name(e))
.or_else(|| {
e.downcast_ref::<dlopen::Error>()
.map(get_dlopen_error_class)

View file

@ -230,6 +230,18 @@ delete Object.prototype.__proto__;
return new domException.DOMException(msg, "NotSupported");
},
);
core.registerErrorBuilder(
"DOMExceptionNetworkError",
function DOMExceptionNetworkError(msg) {
return new domException.DOMException(msg, "NetworkError");
},
);
core.registerErrorBuilder(
"DOMExceptionAbortError",
function DOMExceptionAbortError(msg) {
return new domException.DOMException(msg, "AbortError");
},
);
core.registerErrorBuilder(
"DOMExceptionInvalidCharacterError",
function DOMExceptionInvalidCharacterError(msg) {
@ -342,7 +354,6 @@ delete Object.prototype.__proto__;
URL: util.nonEnumerable(url.URL),
URLSearchParams: util.nonEnumerable(url.URLSearchParams),
WebSocket: util.nonEnumerable(webSocket.WebSocket),
BroadcastChannel: util.nonEnumerable(broadcastChannel.BroadcastChannel),
MessageChannel: util.nonEnumerable(messagePort.MessageChannel),
MessagePort: util.nonEnumerable(messagePort.MessagePort),
Worker: util.nonEnumerable(worker.Worker),
@ -377,6 +388,11 @@ delete Object.prototype.__proto__;
setInterval: util.writable(timers.setInterval),
setTimeout: util.writable(timers.setTimeout),
structuredClone: util.writable(messagePort.structuredClone),
};
const unstableWindowOrWorkerGlobalScope = {
WebSocketStream: util.nonEnumerable(webSocket.WebSocketStream),
BroadcastChannel: util.nonEnumerable(broadcastChannel.BroadcastChannel),
GPU: util.nonEnumerable(webgpu.GPU),
GPUAdapter: util.nonEnumerable(webgpu.GPUAdapter),
@ -485,6 +501,9 @@ delete Object.prototype.__proto__;
util.log("bootstrapMainRuntime");
hasBootstrapped = true;
ObjectDefineProperties(globalThis, windowOrWorkerGlobalScope);
if (runtimeOptions.unstableFlag) {
ObjectDefineProperties(globalThis, unstableWindowOrWorkerGlobalScope);
}
ObjectDefineProperties(globalThis, mainRuntimeGlobalProperties);
ObjectSetPrototypeOf(globalThis, Window.prototype);
@ -573,6 +592,9 @@ delete Object.prototype.__proto__;
util.log("bootstrapWorkerRuntime");
hasBootstrapped = true;
ObjectDefineProperties(globalThis, windowOrWorkerGlobalScope);
if (runtimeOptions.unstableFlag) {
ObjectDefineProperties(globalThis, unstableWindowOrWorkerGlobalScope);
}
ObjectDefineProperties(globalThis, workerRuntimeGlobalProperties);
ObjectDefineProperties(globalThis, { name: util.readOnly(name) });
ObjectSetPrototypeOf(globalThis, DedicatedWorkerGlobalScope.prototype);

View file

@ -16765,6 +16765,15 @@
"referrer.any.html": true,
"Close-delayed.any.html": false,
"Close-delayed.any.html?wpt_flags=h2": false,
"Close-delayed.any.html?wss": false
"Close-delayed.any.html?wss": false,
"stream": {
"tentative": {
"abort.any.html?wss": true,
"backpressure-receive.any.html?wss": true,
"backpressure-send.any.html?wss": true,
"close.any.html?wss": true,
"constructor.any.html?wss": true
}
}
}
}