diff --git a/cli/dts/lib.deno.unstable.d.ts b/cli/dts/lib.deno.unstable.d.ts index ab4a637292..b83309bc9d 100644 --- a/cli/dts/lib.deno.unstable.d.ts +++ b/cli/dts/lib.deno.unstable.d.ts @@ -1152,3 +1152,28 @@ declare interface WorkerOptions { }; }; } + +declare interface WebSocketStreamOptions { + protocols?: string[]; + signal?: AbortSignal; +} + +declare interface WebSocketConnection { + readable: ReadableStream; + writable: WritableStream; + extensions: string; + protocol: string; +} + +declare interface WebSocketCloseInfo { + code?: number; + reason?: string; +} + +declare class WebSocketStream { + constructor(url: string, options?: WebSocketStreamOptions); + url: string; + connection: Promise; + closed: Promise; + close(closeInfo?: WebSocketCloseInfo): void; +} diff --git a/cli/tests/integration/mod.rs b/cli/tests/integration/mod.rs index 938d40125b..76bda70a74 100644 --- a/cli/tests/integration/mod.rs +++ b/cli/tests/integration/mod.rs @@ -643,6 +643,27 @@ fn websocket() { assert!(status.success()); } +#[test] +fn websocketstream() { + let _g = util::http_server(); + + let script = util::tests_path().join("websocketstream_test.ts"); + let root_ca = util::tests_path().join("tls/RootCA.pem"); + let status = util::deno_cmd() + .arg("test") + .arg("--unstable") + .arg("--allow-net") + .arg("--cert") + .arg(root_ca) + .arg(script) + .spawn() + .unwrap() + .wait() + .unwrap(); + + assert!(status.success()); +} + #[cfg(not(windows))] #[test] fn set_raw_should_not_panic_on_no_tty() { diff --git a/cli/tests/websocketstream_test.ts b/cli/tests/websocketstream_test.ts new file mode 100644 index 0000000000..5b4d19f6e8 --- /dev/null +++ b/cli/tests/websocketstream_test.ts @@ -0,0 +1,82 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +import { + assertEquals, + assertThrows, + assertThrowsAsync, +} from "../../test_util/std/testing/asserts.ts"; + +Deno.test("fragment", () => { + assertThrows(() => new WebSocketStream("ws://localhost:4242/#")); + assertThrows(() => new WebSocketStream("ws://localhost:4242/#foo")); +}); + +Deno.test("duplicate protocols", () => { + assertThrows(() => + new WebSocketStream("ws://localhost:4242", { + protocols: ["foo", "foo"], + }) + ); +}); + +Deno.test("connect & close custom valid code", async () => { + const ws = new WebSocketStream("ws://localhost:4242"); + await ws.connection; + ws.close({ code: 1000 }); + await ws.closed; +}); + +Deno.test("connect & close custom invalid reason", async () => { + const ws = new WebSocketStream("ws://localhost:4242"); + await ws.connection; + assertThrows(() => ws.close({ code: 1000, reason: "".padEnd(124, "o") })); + ws.close(); + await ws.closed; +}); + +Deno.test("echo string", async () => { + const ws = new WebSocketStream("ws://localhost:4242"); + const { readable, writable } = await ws.connection; + await writable.getWriter().write("foo"); + const res = await readable.getReader().read(); + assertEquals(res.value, "foo"); + ws.close(); + await ws.closed; +}); + +Deno.test("echo string tls", async () => { + const ws = new WebSocketStream("wss://localhost:4243"); + const { readable, writable } = await ws.connection; + await writable.getWriter().write("foo"); + const res = await readable.getReader().read(); + assertEquals(res.value, "foo"); + ws.close(); + await ws.closed; +}); + +Deno.test("websocket error", async () => { + const ws = new WebSocketStream("wss://localhost:4242"); + await Promise.all([ + assertThrowsAsync( + () => ws.connection, + Deno.errors.UnexpectedEof, + "tls handshake eof", + ), + assertThrowsAsync( + () => ws.closed, + Deno.errors.UnexpectedEof, + "tls handshake eof", + ), + ]); +}); + +Deno.test("echo uint8array", async () => { + const ws = new WebSocketStream("ws://localhost:4242"); + const { readable, writable } = await ws.connection; + const uint = new Uint8Array([102, 111, 111]); + await writable.getWriter().write(uint); + const res = await readable.getReader().read(); + assertEquals(res.value, uint); + ws.close(); + await ws.closed; +}); diff --git a/extensions/web/06_streams.js b/extensions/web/06_streams.js index a3dc9439a7..c4bfad0c8d 100644 --- a/extensions/web/06_streams.js +++ b/extensions/web/06_streams.js @@ -4455,6 +4455,8 @@ isReadableStreamDisturbed, errorReadableStream, createProxy, + writableStreamClose, + Deferred, // Exposed in global runtime scope ByteLengthQueuingStrategy, CountQueuingStrategy, diff --git a/extensions/websocket/01_websocket.js b/extensions/websocket/01_websocket.js index 7af7951975..df8063d210 100644 --- a/extensions/websocket/01_websocket.js +++ b/extensions/websocket/01_websocket.js @@ -234,7 +234,11 @@ this[_url] = wsURL.href; - core.opSync("op_ws_check_permission", this[_url]); + core.opSync( + "op_ws_check_permission_and_cancel_handle", + this[_url], + false, + ); if (typeof protocols === "string") { protocols = [protocols]; diff --git a/extensions/websocket/02_websocketstream.js b/extensions/websocket/02_websocketstream.js new file mode 100644 index 0000000000..6290d94a02 --- /dev/null +++ b/extensions/websocket/02_websocketstream.js @@ -0,0 +1,412 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +/// + +((window) => { + const core = window.Deno.core; + const webidl = window.__bootstrap.webidl; + const { writableStreamClose, Deferred } = window.__bootstrap.streams; + const { DOMException } = window.__bootstrap.domException; + const { add, remove } = window.__bootstrap.abortSignal; + + const { + StringPrototypeEndsWith, + StringPrototypeToLowerCase, + Symbol, + SymbolFor, + Set, + ArrayPrototypeMap, + ArrayPrototypeJoin, + PromisePrototypeThen, + PromisePrototypeCatch, + Uint8Array, + TypeError, + } = window.__bootstrap.primordials; + + webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter( + "WebSocketStreamOptions", + [ + { + key: "protocols", + converter: webidl.converters["sequence"], + get defaultValue() { + return []; + }, + }, + { + key: "signal", + converter: webidl.converters.AbortSignal, + }, + ], + ); + webidl.converters.WebSocketCloseInfo = webidl.createDictionaryConverter( + "WebSocketCloseInfo", + [ + { + key: "code", + converter: webidl.converters["unsigned short"], + }, + { + key: "reason", + converter: webidl.converters.USVString, + defaultValue: "", + }, + ], + ); + + /** + * Tries to close the resource (and ignores BadResource errors). + * @param {number} rid + */ + function tryClose(rid) { + try { + core.close(rid); + } catch (err) { + // Ignore error if the socket has already been closed. + if (!(err instanceof Deno.errors.BadResource)) throw err; + } + } + + const _rid = Symbol("[[rid]]"); + const _url = Symbol("[[url]]"); + const _connection = Symbol("[[connection]]"); + const _closed = Symbol("[[closed]]"); + const _closing = Symbol("[[closing]]"); + const _earlyClose = Symbol("[[earlyClose]]"); + class WebSocketStream { + [_rid]; + + [_url]; + get url() { + webidl.assertBranded(this, WebSocketStream); + return this[_url]; + } + + constructor(url, options) { + this[webidl.brand] = webidl.brand; + const prefix = "Failed to construct 'WebSocketStream'"; + webidl.requiredArguments(arguments.length, 1, { prefix }); + url = webidl.converters.USVString(url, { + prefix, + context: "Argument 1", + }); + options = webidl.converters.WebSocketStreamOptions(options, { + prefix, + context: "Argument 2", + }); + + const wsURL = new URL(url); + + if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") { + throw new DOMException( + "Only ws & wss schemes are allowed in a WebSocket URL.", + "SyntaxError", + ); + } + + if (wsURL.hash !== "" || StringPrototypeEndsWith(wsURL.href, "#")) { + throw new DOMException( + "Fragments are not allowed in a WebSocket URL.", + "SyntaxError", + ); + } + + this[_url] = wsURL.href; + + if ( + options.protocols.length !== + new Set( + ArrayPrototypeMap( + options.protocols, + (p) => StringPrototypeToLowerCase(p), + ), + ).size + ) { + throw new DOMException( + "Can't supply multiple times the same protocol.", + "SyntaxError", + ); + } + + const cancelRid = core.opSync( + "op_ws_check_permission_and_cancel_handle", + this[_url], + true, + ); + + if (options.signal?.aborted) { + core.close(cancelRid); + const err = new DOMException( + "This operation was aborted", + "AbortError", + ); + this[_connection].reject(err); + this[_closed].reject(err); + } else { + const abort = () => { + core.close(cancelRid); + }; + options.signal?.[add](abort); + PromisePrototypeThen( + core.opAsync("op_ws_create", { + url: this[_url], + protocols: options.protocols + ? ArrayPrototypeJoin(options.protocols, ", ") + : "", + cancelHandle: cancelRid, + }), + (create) => { + options.signal?.[remove](abort); + if (this[_earlyClose]) { + PromisePrototypeThen( + core.opAsync("op_ws_close", { + rid: create.rid, + }), + () => { + PromisePrototypeThen( + (async () => { + while (true) { + const { kind } = await core.opAsync( + "op_ws_next_event", + create.rid, + ); + + if (kind === "close") { + break; + } + } + })(), + () => { + const err = new DOMException( + "Closed while connecting", + "NetworkError", + ); + this[_connection].reject(err); + this[_closed].reject(err); + }, + ); + }, + () => { + const err = new DOMException( + "Closed while connecting", + "NetworkError", + ); + this[_connection].reject(err); + this[_closed].reject(err); + }, + ); + } else { + this[_rid] = create.rid; + + const writable = new WritableStream({ + write: async (chunk) => { + if (typeof chunk === "string") { + await core.opAsync("op_ws_send", { + rid: this[_rid], + kind: "text", + text: chunk, + }); + } else if (chunk instanceof Uint8Array) { + await core.opAsync("op_ws_send", { + rid: this[_rid], + kind: "binary", + }, chunk); + } else { + throw new TypeError( + "A chunk may only be either a string or an Uint8Array", + ); + } + }, + close: async (reason) => { + try { + this.close(reason?.code !== undefined ? reason : {}); + } catch (_) { + this.close(); + } + await this.closed; + }, + abort: async (reason) => { + try { + this.close(reason?.code !== undefined ? reason : {}); + } catch (_) { + this.close(); + } + await this.closed; + }, + }); + const readable = new ReadableStream({ + start: (controller) => { + PromisePrototypeThen(this.closed, () => { + try { + controller.close(); + } catch (_) { + // needed to ignore warnings & assertions + } + try { + PromisePrototypeCatch( + writableStreamClose(writable), + () => {}, + ); + } catch (_) { + // needed to ignore warnings & assertions + } + }); + }, + pull: async (controller) => { + const { kind, value } = await core.opAsync( + "op_ws_next_event", + this[_rid], + ); + + switch (kind) { + case "string": { + controller.enqueue(value); + break; + } + case "binary": { + controller.enqueue(value); + break; + } + case "ping": { + await core.opAsync("op_ws_send", { + rid: this[_rid], + kind: "pong", + }); + break; + } + case "close": { + if (this[_closing]) { + this[_closed].resolve(value); + tryClose(this[_rid]); + } else { + PromisePrototypeThen( + core.opAsync("op_ws_close", { + rid: this[_rid], + ...value, + }), + () => { + this[_closed].resolve(value); + tryClose(this[_rid]); + }, + (err) => { + this[_closed].reject(err); + controller.error(err); + tryClose(this[_rid]); + }, + ); + } + break; + } + case "error": { + const err = new Error(value); + this[_closed].reject(err); + controller.error(err); + tryClose(this[_rid]); + break; + } + } + }, + cancel: async (reason) => { + try { + this.close(reason?.code !== undefined ? reason : {}); + } catch (_) { + this.close(); + } + await this.closed; + }, + }); + + this[_connection].resolve({ + readable, + writable, + extensions: create.extensions ?? "", + protocol: create.protocol ?? "", + }); + } + }, + (err) => { + tryClose(cancelRid); + this[_connection].reject(err); + this[_closed].reject(err); + }, + ); + } + } + + [_connection] = new Deferred(); + get connection() { + webidl.assertBranded(this, WebSocketStream); + return this[_connection].promise; + } + + [_earlyClose] = false; + [_closing] = false; + [_closed] = new Deferred(); + get closed() { + webidl.assertBranded(this, WebSocketStream); + return this[_closed].promise; + } + + close(closeInfo) { + webidl.assertBranded(this, WebSocketStream); + closeInfo = webidl.converters.WebSocketCloseInfo(closeInfo, { + prefix: "Failed to execute 'close' on 'WebSocketStream'", + context: "Argument 1", + }); + + if ( + closeInfo.code && + !(closeInfo.code === 1000 || + (3000 <= closeInfo.code && closeInfo.code < 5000)) + ) { + throw new DOMException( + "The close code must be either 1000 or in the range of 3000 to 4999.", + "InvalidAccessError", + ); + } + + const encoder = new TextEncoder(); + if ( + closeInfo.reason && encoder.encode(closeInfo.reason).byteLength > 123 + ) { + throw new DOMException( + "The close reason may not be longer than 123 bytes.", + "SyntaxError", + ); + } + + let code = closeInfo.code; + if (closeInfo.reason && code === undefined) { + code = 1000; + } + + if (this[_connection].state === "pending") { + this[_earlyClose] = true; + } else if (this[_closed].state === "pending") { + this[_closing] = true; + PromisePrototypeCatch( + core.opAsync("op_ws_close", { + rid: this[_rid], + code, + reason: closeInfo.reason, + }), + (err) => { + this[_rid] && tryClose(this[_rid]); + this[_closed].reject(err); + }, + ); + } + } + + [SymbolFor("Deno.customInspect")](inspect) { + return `${this.constructor.name} ${ + inspect({ + url: this.url, + }) + }`; + } + } + + window.__bootstrap.webSocket.WebSocketStream = WebSocketStream; +})(this); diff --git a/extensions/websocket/lib.rs b/extensions/websocket/lib.rs index 896a5f2e26..97e970e855 100644 --- a/extensions/websocket/lib.rs +++ b/extensions/websocket/lib.rs @@ -3,7 +3,6 @@ use deno_core::error::bad_resource_id; use deno_core::error::invalid_hostname; use deno_core::error::null_opbuf; -use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures::stream::SplitSink; use deno_core::futures::stream::SplitStream; @@ -30,6 +29,7 @@ use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; +use std::fmt; use std::path::PathBuf; use std::rc::Rc; use std::sync::Arc; @@ -153,14 +153,26 @@ impl Resource for WsStreamResource { } } +pub struct WsCancelResource(Rc); + +impl Resource for WsCancelResource { + fn name(&self) -> Cow { + "webSocketCancel".into() + } + + fn close(self: Rc) { + self.0.cancel() + } +} + // This op is needed because creating a WS instance in JavaScript is a sync // operation and should throw error when permissions are not fulfilled, // but actual op that connects WS is async. -pub fn op_ws_check_permission( +pub fn op_ws_check_permission_and_cancel_handle( state: &mut OpState, url: String, - _: (), -) -> Result<(), AnyError> + cancel_handle: bool, +) -> Result, AnyError> where WP: WebSocketPermissions + 'static, { @@ -168,7 +180,14 @@ where .borrow_mut::() .check_net_url(&url::Url::parse(&url)?)?; - Ok(()) + if cancel_handle { + let rid = state + .resource_table + .add(WsCancelResource(CancelHandle::new_rc())); + Ok(Some(rid)) + } else { + Ok(None) + } } #[derive(Deserialize)] @@ -176,6 +195,7 @@ where pub struct CreateArgs { url: String, protocols: String, + cancel_handle: Option, } #[derive(Serialize)] @@ -246,14 +266,32 @@ where _ => unreachable!(), }; + let client = client_async(request, socket); let (stream, response): (WsStream, Response) = - client_async(request, socket).await.map_err(|err| { - type_error(format!( + if let Some(cancel_rid) = args.cancel_handle { + let r = state + .borrow_mut() + .resource_table + .get::(cancel_rid) + .ok_or_else(bad_resource_id)?; + client + .or_cancel(r.0.to_owned()) + .await + .map_err(|_| DomExceptionAbortError::new("connection was aborted"))? + } else { + client.await + } + .map_err(|err| { + DomExceptionNetworkError::new(&format!( "failed to connect to WebSocket: {}", err.to_string() )) })?; + if let Some(cancel_rid) = args.cancel_handle { + state.borrow_mut().resource_table.close(cancel_rid); + } + let (ws_tx, ws_rx) = stream.split(); let resource = WsStreamResource { stream: WebSocketStreamType::Client { @@ -398,11 +436,12 @@ pub fn init( .js(include_js_files!( prefix "deno:extensions/websocket", "01_websocket.js", + "02_websocketstream.js", )) .ops(vec![ ( - "op_ws_check_permission", - op_sync(op_ws_check_permission::

), + "op_ws_check_permission_and_cancel_handle", + op_sync(op_ws_check_permission_and_cancel_handle::

), ), ("op_ws_create", op_async(op_ws_create::

)), ("op_ws_send", op_async(op_ws_send)), @@ -423,3 +462,55 @@ pub fn init( pub fn get_declaration() -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_websocket.d.ts") } + +#[derive(Debug)] +pub struct DomExceptionNetworkError { + pub msg: String, +} + +impl DomExceptionNetworkError { + pub fn new(msg: &str) -> Self { + DomExceptionNetworkError { + msg: msg.to_string(), + } + } +} + +impl fmt::Display for DomExceptionNetworkError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad(&self.msg) + } +} + +impl std::error::Error for DomExceptionNetworkError {} + +pub fn get_network_error_class_name(e: &AnyError) -> Option<&'static str> { + e.downcast_ref::() + .map(|_| "DOMExceptionNetworkError") +} + +#[derive(Debug)] +pub struct DomExceptionAbortError { + pub msg: String, +} + +impl DomExceptionAbortError { + pub fn new(msg: &str) -> Self { + DomExceptionAbortError { + msg: msg.to_string(), + } + } +} + +impl fmt::Display for DomExceptionAbortError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad(&self.msg) + } +} + +impl std::error::Error for DomExceptionAbortError {} + +pub fn get_abort_error_class_name(e: &AnyError) -> Option<&'static str> { + e.downcast_ref::() + .map(|_| "DOMExceptionAbortError") +} diff --git a/runtime/errors.rs b/runtime/errors.rs index f773be58d6..db14bd8bfb 100644 --- a/runtime/errors.rs +++ b/runtime/errors.rs @@ -159,6 +159,8 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> { .or_else(|| deno_webgpu::error::get_error_class_name(e)) .or_else(|| deno_web::get_error_class_name(e)) .or_else(|| deno_webstorage::get_not_supported_error_class_name(e)) + .or_else(|| deno_websocket::get_network_error_class_name(e)) + .or_else(|| deno_websocket::get_abort_error_class_name(e)) .or_else(|| { e.downcast_ref::() .map(get_dlopen_error_class) diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index a85c40eb98..6d5599e71a 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -230,6 +230,18 @@ delete Object.prototype.__proto__; return new domException.DOMException(msg, "NotSupported"); }, ); + core.registerErrorBuilder( + "DOMExceptionNetworkError", + function DOMExceptionNetworkError(msg) { + return new domException.DOMException(msg, "NetworkError"); + }, + ); + core.registerErrorBuilder( + "DOMExceptionAbortError", + function DOMExceptionAbortError(msg) { + return new domException.DOMException(msg, "AbortError"); + }, + ); core.registerErrorBuilder( "DOMExceptionInvalidCharacterError", function DOMExceptionInvalidCharacterError(msg) { @@ -342,7 +354,6 @@ delete Object.prototype.__proto__; URL: util.nonEnumerable(url.URL), URLSearchParams: util.nonEnumerable(url.URLSearchParams), WebSocket: util.nonEnumerable(webSocket.WebSocket), - BroadcastChannel: util.nonEnumerable(broadcastChannel.BroadcastChannel), MessageChannel: util.nonEnumerable(messagePort.MessageChannel), MessagePort: util.nonEnumerable(messagePort.MessagePort), Worker: util.nonEnumerable(worker.Worker), @@ -377,6 +388,11 @@ delete Object.prototype.__proto__; setInterval: util.writable(timers.setInterval), setTimeout: util.writable(timers.setTimeout), structuredClone: util.writable(messagePort.structuredClone), + }; + + const unstableWindowOrWorkerGlobalScope = { + WebSocketStream: util.nonEnumerable(webSocket.WebSocketStream), + BroadcastChannel: util.nonEnumerable(broadcastChannel.BroadcastChannel), GPU: util.nonEnumerable(webgpu.GPU), GPUAdapter: util.nonEnumerable(webgpu.GPUAdapter), @@ -485,6 +501,9 @@ delete Object.prototype.__proto__; util.log("bootstrapMainRuntime"); hasBootstrapped = true; ObjectDefineProperties(globalThis, windowOrWorkerGlobalScope); + if (runtimeOptions.unstableFlag) { + ObjectDefineProperties(globalThis, unstableWindowOrWorkerGlobalScope); + } ObjectDefineProperties(globalThis, mainRuntimeGlobalProperties); ObjectSetPrototypeOf(globalThis, Window.prototype); @@ -573,6 +592,9 @@ delete Object.prototype.__proto__; util.log("bootstrapWorkerRuntime"); hasBootstrapped = true; ObjectDefineProperties(globalThis, windowOrWorkerGlobalScope); + if (runtimeOptions.unstableFlag) { + ObjectDefineProperties(globalThis, unstableWindowOrWorkerGlobalScope); + } ObjectDefineProperties(globalThis, workerRuntimeGlobalProperties); ObjectDefineProperties(globalThis, { name: util.readOnly(name) }); ObjectSetPrototypeOf(globalThis, DedicatedWorkerGlobalScope.prototype); diff --git a/tools/wpt/expectation.json b/tools/wpt/expectation.json index 7470b2ad25..d2c5a31948 100644 --- a/tools/wpt/expectation.json +++ b/tools/wpt/expectation.json @@ -16765,6 +16765,15 @@ "referrer.any.html": true, "Close-delayed.any.html": false, "Close-delayed.any.html?wpt_flags=h2": false, - "Close-delayed.any.html?wss": false + "Close-delayed.any.html?wss": false, + "stream": { + "tentative": { + "abort.any.html?wss": true, + "backpressure-receive.any.html?wss": true, + "backpressure-send.any.html?wss": true, + "close.any.html?wss": true, + "constructor.any.html?wss": true + } + } } } \ No newline at end of file