From 2e18fcebcc2ee931ee952ac2fe2175d6ec7acf69 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Wed, 6 Jan 2021 16:57:28 +0100 Subject: [PATCH] refactor: move WebSocket API to an op_crate (#9026) --- Cargo.lock | 18 +- cli/Cargo.toml | 2 +- cli/build.rs | 5 + cli/dts/lib.deno.shared_globals.d.ts | 107 +----- cli/main.rs | 3 +- cli/tests/integration_tests.rs | 1 + cli/tsc.rs | 2 + .../websocket/01_websocket.js | 59 ++- op_crates/websocket/Cargo.toml | 24 ++ op_crates/websocket/README.md | 5 + op_crates/websocket/lib.deno_websocket.d.ts | 112 ++++++ op_crates/websocket/lib.rs | 347 ++++++++++++++++++ runtime/Cargo.toml | 3 +- runtime/build.rs | 1 + runtime/inspector.rs | 19 +- runtime/lib.rs | 1 + runtime/ops/websocket.rs | 327 +---------------- runtime/permissions.rs | 6 + runtime/web_worker.rs | 2 +- runtime/worker.rs | 2 +- 20 files changed, 609 insertions(+), 437 deletions(-) rename runtime/js/27_websocket.js => op_crates/websocket/01_websocket.js (84%) create mode 100644 op_crates/websocket/Cargo.toml create mode 100644 op_crates/websocket/README.md create mode 100644 op_crates/websocket/lib.deno_websocket.d.ts create mode 100644 op_crates/websocket/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 09a8e39ddb..f22d9a6fdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -417,6 +417,7 @@ dependencies = [ "deno_lint", "deno_runtime", "deno_web", + "deno_websocket", "dissimilar", "dprint-plugin-typescript", "encoding_rs", @@ -451,7 +452,6 @@ dependencies = [ "test_util", "tokio 0.2.22", "tokio-rustls", - "tokio-tungstenite", "tower-test", "uuid", "walkdir", @@ -538,6 +538,7 @@ dependencies = [ "deno_crypto", "deno_fetch", "deno_web", + "deno_websocket", "dlopen", "encoding_rs", "env_logger", @@ -563,7 +564,6 @@ dependencies = [ "test_util", "tokio 0.2.22", "tokio-rustls", - "tokio-tungstenite", "uuid", "webpki", "webpki-roots", @@ -581,6 +581,20 @@ dependencies = [ "serde", ] +[[package]] +name = "deno_websocket" +version = "0.1.0" +dependencies = [ + "deno_core", + "http", + "serde", + "tokio 0.2.22", + "tokio-rustls", + "tokio-tungstenite", + "webpki", + "webpki-roots", +] + [[package]] name = "derive_more" version = "0.99.11" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 7c433e5143..99760023a5 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -23,6 +23,7 @@ path = "./bench/main.rs" deno_core = { path = "../core", version = "0.75.0" } deno_fetch = { path = "../op_crates/fetch", version = "0.18.0" } deno_web = { path = "../op_crates/web", version = "0.26.0" } +deno_websocket = { path = "../op_crates/websocket", version = "0.1.0" } regex = "1.3.9" serde = { version = "1.0.116", features = ["derive"] } @@ -84,7 +85,6 @@ nix = "0.19.0" chrono = "0.4.15" os_pipe = "0.9.2" test_util = { path = "../test_util" } -tokio-tungstenite = "0.11.0" tower-test = "0.3.0" [target.'cfg(unix)'.dev-dependencies] diff --git a/cli/build.rs b/cli/build.rs index 45221281ff..71abfbd835 100644 --- a/cli/build.rs +++ b/cli/build.rs @@ -57,6 +57,7 @@ fn create_compiler_snapshot( let mut op_crate_libs = HashMap::new(); op_crate_libs.insert("deno.web", deno_web::get_declaration()); op_crate_libs.insert("deno.fetch", deno_fetch::get_declaration()); + op_crate_libs.insert("deno.websocket", deno_websocket::get_declaration()); // ensure we invalidate the build properly. for (_, path) in op_crate_libs.iter() { @@ -245,6 +246,10 @@ fn main() { "cargo:rustc-env=DENO_FETCH_LIB_PATH={}", deno_fetch::get_declaration().display() ); + println!( + "cargo:rustc-env=DENO_WEBSOCKET_LIB_PATH={}", + deno_websocket::get_declaration().display() + ); println!("cargo:rustc-env=TARGET={}", env::var("TARGET").unwrap()); println!("cargo:rustc-env=PROFILE={}", env::var("PROFILE").unwrap()); diff --git a/cli/dts/lib.deno.shared_globals.d.ts b/cli/dts/lib.deno.shared_globals.d.ts index 433e90113a..e4d763ec0f 100644 --- a/cli/dts/lib.deno.shared_globals.d.ts +++ b/cli/dts/lib.deno.shared_globals.d.ts @@ -7,6 +7,7 @@ /// /// /// +/// declare namespace WebAssembly { /** @@ -852,109 +853,3 @@ interface ErrorConstructor { // TODO(nayeemrmn): Support `Error.prepareStackTrace()`. We currently use this // internally in a way that makes it unavailable for users. } - -interface CloseEventInit extends EventInit { - code?: number; - reason?: string; - wasClean?: boolean; -} - -declare class CloseEvent extends Event { - constructor(type: string, eventInitDict?: CloseEventInit); - /** - * Returns the WebSocket connection close code provided by the server. - */ - readonly code: number; - /** - * Returns the WebSocket connection close reason provided by the server. - */ - readonly reason: string; - /** - * Returns true if the connection closed cleanly; false otherwise. - */ - readonly wasClean: boolean; -} - -interface WebSocketEventMap { - close: CloseEvent; - error: Event; - message: MessageEvent; - open: Event; -} - -/** Provides the API for creating and managing a WebSocket connection to a server, as well as for sending and receiving data on the connection. */ -declare class WebSocket extends EventTarget { - constructor(url: string, protocols?: string | string[]); - - static readonly CLOSED: number; - static readonly CLOSING: number; - static readonly CONNECTING: number; - static readonly OPEN: number; - - /** - * Returns a string that indicates how binary data from the WebSocket object is exposed to scripts: - * - * Can be set, to change how binary data is returned. The default is "blob". - */ - binaryType: BinaryType; - /** - * Returns the number of bytes of application data (UTF-8 text and binary data) that have been queued using send() but not yet been transmitted to the network. - * - * If the WebSocket connection is closed, this attribute's value will only increase with each call to the send() method. (The number does not reset to zero once the connection closes.) - */ - readonly bufferedAmount: number; - /** - * Returns the extensions selected by the server, if any. - */ - readonly extensions: string; - onclose: ((this: WebSocket, ev: CloseEvent) => any) | null; - onerror: ((this: WebSocket, ev: Event | ErrorEvent) => any) | null; - onmessage: ((this: WebSocket, ev: MessageEvent) => any) | null; - onopen: ((this: WebSocket, ev: Event) => any) | null; - /** - * Returns the subprotocol selected by the server, if any. It can be used in conjunction with the array form of the constructor's second argument to perform subprotocol negotiation. - */ - readonly protocol: string; - /** - * Returns the state of the WebSocket object's connection. It can have the values described below. - */ - readonly readyState: number; - /** - * Returns the URL that was used to establish the WebSocket connection. - */ - readonly url: string; - /** - * Closes the WebSocket connection, optionally using code as the the WebSocket connection close code and reason as the the WebSocket connection close reason. - */ - close(code?: number, reason?: string): void; - /** - * Transmits data using the WebSocket connection. data can be a string, a Blob, an ArrayBuffer, or an ArrayBufferView. - */ - send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void; - readonly CLOSED: number; - readonly CLOSING: number; - readonly CONNECTING: number; - readonly OPEN: number; - addEventListener( - type: K, - listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, - options?: boolean | AddEventListenerOptions, - ): void; - addEventListener( - type: string, - listener: EventListenerOrEventListenerObject, - options?: boolean | AddEventListenerOptions, - ): void; - removeEventListener( - type: K, - listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, - options?: boolean | EventListenerOptions, - ): void; - removeEventListener( - type: string, - listener: EventListenerOrEventListenerObject, - options?: boolean | EventListenerOptions, - ): void; -} - -type BinaryType = "arraybuffer" | "blob"; diff --git a/cli/main.rs b/cli/main.rs index 7f6a80f30e..b622cccf3b 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -275,10 +275,11 @@ fn print_cache_info( fn get_types(unstable: bool) -> String { let mut types = format!( - "{}\n{}\n{}\n{}\n{}", + "{}\n{}\n{}\n{}\n{}\n{}", crate::tsc::DENO_NS_LIB, crate::tsc::DENO_WEB_LIB, crate::tsc::DENO_FETCH_LIB, + crate::tsc::DENO_WEBSOCKET_LIB, crate::tsc::SHARED_GLOBALS_LIB, crate::tsc::WINDOW_LIB, ); diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index 624f17df13..b42315e1ab 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -4,6 +4,7 @@ use deno_core::futures::prelude::*; use deno_core::serde_json; use deno_core::url; use deno_runtime::deno_fetch::reqwest; +use deno_runtime::deno_websocket::tokio_tungstenite; use std::io::{BufRead, Write}; use std::path::Path; use std::path::PathBuf; diff --git a/cli/tsc.rs b/cli/tsc.rs index 29ea45140b..558f93a480 100644 --- a/cli/tsc.rs +++ b/cli/tsc.rs @@ -30,6 +30,8 @@ use std::sync::Mutex; pub static DENO_NS_LIB: &str = include_str!("dts/lib.deno.ns.d.ts"); pub static DENO_WEB_LIB: &str = include_str!(env!("DENO_WEB_LIB_PATH")); pub static DENO_FETCH_LIB: &str = include_str!(env!("DENO_FETCH_LIB_PATH")); +pub static DENO_WEBSOCKET_LIB: &str = + include_str!(env!("DENO_WEBSOCKET_LIB_PATH")); pub static SHARED_GLOBALS_LIB: &str = include_str!("dts/lib.deno.shared_globals.d.ts"); pub static WINDOW_LIB: &str = include_str!("dts/lib.deno.window.d.ts"); diff --git a/runtime/js/27_websocket.js b/op_crates/websocket/01_websocket.js similarity index 84% rename from runtime/js/27_websocket.js rename to op_crates/websocket/01_websocket.js index 9f86bdbed0..6cc23eb953 100644 --- a/runtime/js/27_websocket.js +++ b/op_crates/websocket/01_websocket.js @@ -2,12 +2,64 @@ ((window) => { const core = window.Deno.core; - const { requiredArguments, defineEventHandler } = window.__bootstrap.webUtil; + + // provided by "deno_web" + const { URL } = window.__bootstrap.url; + const CONNECTING = 0; const OPEN = 1; const CLOSING = 2; const CLOSED = 3; + function requiredArguments( + name, + length, + required, + ) { + if (length < required) { + const errMsg = `${name} requires at least ${required} argument${ + required === 1 ? "" : "s" + }, but only ${length} present`; + throw new TypeError(errMsg); + } + } + + const handlerSymbol = Symbol("eventHandlers"); + function makeWrappedHandler(handler) { + function wrappedHandler(...args) { + if (typeof wrappedHandler.handler !== "function") { + return; + } + return wrappedHandler.handler.call(this, ...args); + } + wrappedHandler.handler = handler; + return wrappedHandler; + } + // TODO(lucacasonato) reuse when we can reuse code between web crates + function defineEventHandler(emitter, name) { + // HTML specification section 8.1.5.1 + Object.defineProperty(emitter, `on${name}`, { + get() { + return this[handlerSymbol]?.get(name)?.handler; + }, + set(value) { + if (!this[handlerSymbol]) { + this[handlerSymbol] = new Map(); + } + let handlerWrapper = this[handlerSymbol]?.get(name); + if (handlerWrapper) { + handlerWrapper.handler = value; + } else { + handlerWrapper = makeWrappedHandler(value); + this.addEventListener(name, handlerWrapper); + } + this[handlerSymbol].set(name, handlerWrapper); + }, + configurable: true, + enumerable: true, + }); + } + class WebSocket extends EventTarget { #readyState = CONNECTING; @@ -319,7 +371,6 @@ defineEventHandler(WebSocket.prototype, "error"); defineEventHandler(WebSocket.prototype, "close"); defineEventHandler(WebSocket.prototype, "open"); - window.__bootstrap.webSocket = { - WebSocket, - }; + + window.__bootstrap.webSocket = { WebSocket }; })(this); diff --git a/op_crates/websocket/Cargo.toml b/op_crates/websocket/Cargo.toml new file mode 100644 index 0000000000..20fba9804e --- /dev/null +++ b/op_crates/websocket/Cargo.toml @@ -0,0 +1,24 @@ +# Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +[package] +name = "deno_websocket" +version = "0.1.0" +edition = "2018" +description = "Implementation of WebSocket API for Deno" +authors = ["the Deno authors"] +license = "MIT" +readme = "README.md" +repository = "https://github.com/denoland/deno" + +[lib] +path = "lib.rs" + +[dependencies] +deno_core = { version = "0.75.0", path = "../../core" } +http = "0.2.1" +tokio = { version = "0.2.22", features = ["full"] } +tokio-rustls = "0.14.1" +tokio-tungstenite = "0.11.0" +serde = { version = "1.0.116", features = ["derive"] } +webpki = "0.21.3" +webpki-roots = "=0.19.0" # Pinned to v0.19.0 to match 'reqwest'. diff --git a/op_crates/websocket/README.md b/op_crates/websocket/README.md new file mode 100644 index 0000000000..d6495f3974 --- /dev/null +++ b/op_crates/websocket/README.md @@ -0,0 +1,5 @@ +# deno_websocket + +This op crate implements the websocket functions of Deno. + +Spec: https://html.spec.whatwg.org/multipage/web-sockets.html diff --git a/op_crates/websocket/lib.deno_websocket.d.ts b/op_crates/websocket/lib.deno_websocket.d.ts new file mode 100644 index 0000000000..d47665c8bb --- /dev/null +++ b/op_crates/websocket/lib.deno_websocket.d.ts @@ -0,0 +1,112 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +// deno-lint-ignore-file no-explicit-any + +/// +/// + +interface CloseEventInit extends EventInit { + code?: number; + reason?: string; + wasClean?: boolean; +} + +declare class CloseEvent extends Event { + constructor(type: string, eventInitDict?: CloseEventInit); + /** + * Returns the WebSocket connection close code provided by the server. + */ + readonly code: number; + /** + * Returns the WebSocket connection close reason provided by the server. + */ + readonly reason: string; + /** + * Returns true if the connection closed cleanly; false otherwise. + */ + readonly wasClean: boolean; +} + +interface WebSocketEventMap { + close: CloseEvent; + error: Event; + message: MessageEvent; + open: Event; +} + +/** Provides the API for creating and managing a WebSocket connection to a server, as well as for sending and receiving data on the connection. */ +declare class WebSocket extends EventTarget { + constructor(url: string, protocols?: string | string[]); + + static readonly CLOSED: number; + static readonly CLOSING: number; + static readonly CONNECTING: number; + static readonly OPEN: number; + + /** + * Returns a string that indicates how binary data from the WebSocket object is exposed to scripts: + * + * Can be set, to change how binary data is returned. The default is "blob". + */ + binaryType: BinaryType; + /** + * Returns the number of bytes of application data (UTF-8 text and binary data) that have been queued using send() but not yet been transmitted to the network. + * + * If the WebSocket connection is closed, this attribute's value will only increase with each call to the send() method. (The number does not reset to zero once the connection closes.) + */ + readonly bufferedAmount: number; + /** + * Returns the extensions selected by the server, if any. + */ + readonly extensions: string; + onclose: ((this: WebSocket, ev: CloseEvent) => any) | null; + onerror: ((this: WebSocket, ev: Event | ErrorEvent) => any) | null; + onmessage: ((this: WebSocket, ev: MessageEvent) => any) | null; + onopen: ((this: WebSocket, ev: Event) => any) | null; + /** + * Returns the subprotocol selected by the server, if any. It can be used in conjunction with the array form of the constructor's second argument to perform subprotocol negotiation. + */ + readonly protocol: string; + /** + * Returns the state of the WebSocket object's connection. It can have the values described below. + */ + readonly readyState: number; + /** + * Returns the URL that was used to establish the WebSocket connection. + */ + readonly url: string; + /** + * Closes the WebSocket connection, optionally using code as the the WebSocket connection close code and reason as the the WebSocket connection close reason. + */ + close(code?: number, reason?: string): void; + /** + * Transmits data using the WebSocket connection. data can be a string, a Blob, an ArrayBuffer, or an ArrayBufferView. + */ + send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void; + readonly CLOSED: number; + readonly CLOSING: number; + readonly CONNECTING: number; + readonly OPEN: number; + addEventListener( + type: K, + listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, + options?: boolean | AddEventListenerOptions, + ): void; + addEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | AddEventListenerOptions, + ): void; + removeEventListener( + type: K, + listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, + options?: boolean | EventListenerOptions, + ): void; + removeEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | EventListenerOptions, + ): void; +} + +type BinaryType = "arraybuffer" | "blob"; diff --git a/op_crates/websocket/lib.rs b/op_crates/websocket/lib.rs new file mode 100644 index 0000000000..b688fe9fdb --- /dev/null +++ b/op_crates/websocket/lib.rs @@ -0,0 +1,347 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::bad_resource_id; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::stream::SplitSink; +use deno_core::futures::stream::SplitStream; +use deno_core::futures::SinkExt; +use deno_core::futures::StreamExt; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::url; +use deno_core::AsyncRefCell; +use deno_core::BufVec; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use deno_core::JsRuntime; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::{serde_json, ZeroCopyBuf}; + +use http::{Method, Request, Uri}; +use serde::Deserialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::io::BufReader; +use std::io::Cursor; +use std::path::PathBuf; +use std::rc::Rc; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio_rustls::{rustls::ClientConfig, TlsConnector}; +use tokio_tungstenite::stream::Stream as StreamSwitcher; +use tokio_tungstenite::tungstenite::Error as TungsteniteError; +use tokio_tungstenite::tungstenite::{ + handshake::client::Response, protocol::frame::coding::CloseCode, + protocol::CloseFrame, Message, +}; +use tokio_tungstenite::{client_async, WebSocketStream}; +use webpki::DNSNameRef; + +pub use tokio_tungstenite; // Re-export tokio_tungstenite + +#[derive(Clone)] +pub struct WsCaData(pub Vec); +#[derive(Clone)] +pub struct WsUserAgent(pub String); + +pub trait WebSocketPermissions { + fn check_net_url(&self, _url: &url::Url) -> Result<(), AnyError>; +} + +/// For use with `op_websocket_*` when the user does not want permissions. +pub struct NoWebSocketPermissions; + +impl WebSocketPermissions for NoWebSocketPermissions { + fn check_net_url(&self, _url: &url::Url) -> Result<(), AnyError> { + Ok(()) + } +} + +type MaybeTlsStream = + StreamSwitcher>; + +type WsStream = WebSocketStream; +struct WsStreamResource { + tx: AsyncRefCell>, + rx: AsyncRefCell>, + // When a `WsStreamResource` resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures are attached to this cancel handle. + cancel: CancelHandle, +} + +impl Resource for WsStreamResource { + fn name(&self) -> Cow { + "webSocketStream".into() + } +} + +impl WsStreamResource {} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CheckPermissionArgs { + url: String, +} + +// This op is needed because creating a WS instance in JavaScript is a sync +// operation and should throw error when permissions are not fullfiled, +// but actual op that connects WS is async. +pub fn op_ws_check_permission( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result +where + WP: WebSocketPermissions + 'static, +{ + let args: CheckPermissionArgs = serde_json::from_value(args)?; + + state + .borrow::() + .check_net_url(&url::Url::parse(&args.url)?)?; + + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CreateArgs { + url: String, + protocols: String, +} + +pub async fn op_ws_create( + state: Rc>, + args: Value, + _bufs: BufVec, +) -> Result +where + WP: WebSocketPermissions + 'static, +{ + let args: CreateArgs = serde_json::from_value(args)?; + + { + let s = state.borrow(); + s.borrow::() + .check_net_url(&url::Url::parse(&args.url)?) + .expect( + "Permission check should have been done in op_ws_check_permission", + ); + } + + let ws_ca_data = state.borrow().try_borrow::().cloned(); + let user_agent = state.borrow().borrow::().0.clone(); + let uri: Uri = args.url.parse()?; + let mut request = Request::builder().method(Method::GET).uri(&uri); + + request = request.header("User-Agent", user_agent); + + if !args.protocols.is_empty() { + request = request.header("Sec-WebSocket-Protocol", args.protocols); + } + + let request = request.body(())?; + let domain = &uri.host().unwrap().to_string(); + let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { + Some("wss") => 443, + Some("ws") => 80, + _ => unreachable!(), + }); + let addr = format!("{}:{}", domain, port); + let try_socket = TcpStream::connect(addr).await; + let tcp_socket = match try_socket.map_err(TungsteniteError::Io) { + Ok(socket) => socket, + Err(_) => return Ok(json!({"success": false})), + }; + + let socket: MaybeTlsStream = match uri.scheme_str() { + Some("ws") => StreamSwitcher::Plain(tcp_socket), + Some("wss") => { + let mut config = ClientConfig::new(); + config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + + if let Some(ws_ca_data) = ws_ca_data { + let reader = &mut BufReader::new(Cursor::new(ws_ca_data.0)); + config.root_store.add_pem_file(reader).unwrap(); + } + + let tls_connector = TlsConnector::from(Arc::new(config)); + let dnsname = + DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); + let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; + StreamSwitcher::Tls(tls_socket) + } + _ => unreachable!(), + }; + + let (stream, response): (WsStream, Response) = + client_async(request, socket).await.map_err(|err| { + type_error(format!( + "failed to connect to WebSocket: {}", + err.to_string() + )) + })?; + + let (ws_tx, ws_rx) = stream.split(); + let resource = WsStreamResource { + rx: AsyncRefCell::new(ws_rx), + tx: AsyncRefCell::new(ws_tx), + cancel: Default::default(), + }; + let mut state = state.borrow_mut(); + let rid = state.resource_table.add(resource); + + let protocol = match response.headers().get("Sec-WebSocket-Protocol") { + Some(header) => header.to_str().unwrap(), + None => "", + }; + let extensions = response + .headers() + .get_all("Sec-WebSocket-Extensions") + .iter() + .map(|header| header.to_str().unwrap()) + .collect::(); + Ok(json!({ + "success": true, + "rid": rid, + "protocol": protocol, + "extensions": extensions + })) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct SendArgs { + rid: u32, + kind: String, + text: Option, +} + +pub async fn op_ws_send( + state: Rc>, + args: Value, + bufs: BufVec, +) -> Result { + let args: SendArgs = serde_json::from_value(args)?; + + let msg = match args.kind.as_str() { + "text" => Message::Text(args.text.unwrap()), + "binary" => Message::Binary(bufs[0].to_vec()), + "pong" => Message::Pong(vec![]), + _ => unreachable!(), + }; + let rid = args.rid; + + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .ok_or_else(bad_resource_id)?; + let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; + tx.send(msg).await?; + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CloseArgs { + rid: u32, + code: Option, + reason: Option, +} + +pub async fn op_ws_close( + state: Rc>, + args: Value, + _bufs: BufVec, +) -> Result { + let args: CloseArgs = serde_json::from_value(args)?; + let rid = args.rid; + let msg = Message::Close(args.code.map(|c| CloseFrame { + code: CloseCode::from(c), + reason: match args.reason { + Some(reason) => Cow::from(reason), + None => Default::default(), + }, + })); + + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .ok_or_else(bad_resource_id)?; + let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; + tx.send(msg).await?; + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct NextEventArgs { + rid: u32, +} + +pub async fn op_ws_next_event( + state: Rc>, + args: Value, + _bufs: BufVec, +) -> Result { + let args: NextEventArgs = serde_json::from_value(args)?; + + let resource = state + .borrow_mut() + .resource_table + .get::(args.rid) + .ok_or_else(bad_resource_id)?; + + let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + let val = rx.next().or_cancel(cancel).await?; + let res = match val { + Some(Ok(Message::Text(text))) => json!({ + "type": "string", + "data": text + }), + Some(Ok(Message::Binary(data))) => { + // TODO(ry): don't use json to send binary data. + json!({ + "type": "binary", + "data": data + }) + } + Some(Ok(Message::Close(Some(frame)))) => json!({ + "type": "close", + "code": u16::from(frame.code), + "reason": frame.reason.as_ref() + }), + Some(Ok(Message::Close(None))) => json!({ "type": "close" }), + Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), + Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), + Some(Err(_)) => json!({"type": "error"}), + None => { + state.borrow_mut().resource_table.close(args.rid).unwrap(); + json!({"type": "closed"}) + } + }; + Ok(res) +} + +/// Load and execute the javascript code. +pub fn init(isolate: &mut JsRuntime) { + isolate + .execute( + "deno:op_crates/websocket/01_websocket.js", + include_str!("01_websocket.js"), + ) + .unwrap(); +} + +pub fn get_declaration() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_websocket.d.ts") +} diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 8632019b63..f641dcb426 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -22,6 +22,7 @@ deno_core = { path = "../core", version = "0.75.0" } deno_crypto = { path = "../op_crates/crypto", version = "0.9.0" } deno_fetch = { path = "../op_crates/fetch", version = "0.18.0" } deno_web = { path = "../op_crates/web", version = "0.26.0" } +deno_websocket = { path = "../op_crates/websocket", version = "0.1.0" } [target.'cfg(windows)'.build-dependencies] winres = "0.1.11" @@ -32,6 +33,7 @@ deno_core = { path = "../core", version = "0.75.0" } deno_crypto = { path = "../op_crates/crypto", version = "0.9.0" } deno_fetch = { path = "../op_crates/fetch", version = "0.18.0" } deno_web = { path = "../op_crates/web", version = "0.26.0" } +deno_websocket = { path = "../op_crates/websocket", version = "0.1.0" } atty = "0.2.14" dlopen = "0.1.8" @@ -55,7 +57,6 @@ sys-info = "0.7.0" termcolor = "1.1.0" tokio = { version = "0.2.22", features = ["full"] } tokio-rustls = "0.14.1" -tokio-tungstenite = "0.11.0" uuid = { version = "0.8.1", features = ["v4"] } hyper = "0.13.9" webpki = "0.21.3" diff --git a/runtime/build.rs b/runtime/build.rs index ca4a77c314..a34e9ffa83 100644 --- a/runtime/build.rs +++ b/runtime/build.rs @@ -15,6 +15,7 @@ fn create_snapshot( ) { deno_web::init(&mut js_runtime); deno_fetch::init(&mut js_runtime); + deno_websocket::init(&mut js_runtime); deno_crypto::init(&mut js_runtime); // TODO(nayeemrmn): https://github.com/rust-lang/cargo/issues/3946 to get the // workspace root. diff --git a/runtime/inspector.rs b/runtime/inspector.rs index befaaebeee..ae90e0fae5 100644 --- a/runtime/inspector.rs +++ b/runtime/inspector.rs @@ -23,6 +23,7 @@ use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; use deno_core::v8; +use deno_websocket::tokio_tungstenite::tungstenite; use std::collections::HashMap; use std::ffi::c_void; use std::mem::replace; @@ -40,7 +41,6 @@ use std::sync::Mutex; use std::thread; use std::{cell::BorrowMutError, convert::Infallible}; use std::{cell::RefCell, rc::Rc}; -use tokio_tungstenite::tungstenite; use uuid::Uuid; pub struct InspectorServer { @@ -185,12 +185,13 @@ fn handle_ws_request( if resp.is_ok() { tokio::task::spawn_local(async move { let upgraded = body.on_upgrade().await.unwrap(); - let websocket = tokio_tungstenite::WebSocketStream::from_raw_socket( - upgraded, - tungstenite::protocol::Role::Server, - None, - ) - .await; + let websocket = + deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + tungstenite::protocol::Role::Server, + None, + ) + .await; let (proxy, pump) = create_websocket_proxy(websocket); let _ = new_websocket_tx.unbounded_send(proxy); @@ -353,7 +354,9 @@ impl WebSocketProxy { /// be used to send/receive messages on the websocket, and the second element /// is a future that does the forwarding. fn create_websocket_proxy( - websocket: tokio_tungstenite::WebSocketStream, + websocket: deno_websocket::tokio_tungstenite::WebSocketStream< + hyper::upgrade::Upgraded, + >, ) -> (WebSocketProxy, impl Future + Send) { // The 'outbound' channel carries messages sent to the websocket. let (outbound_tx, outbound_rx) = mpsc::unbounded(); diff --git a/runtime/lib.rs b/runtime/lib.rs index 6745f3ec81..cc6d02681d 100644 --- a/runtime/lib.rs +++ b/runtime/lib.rs @@ -10,6 +10,7 @@ extern crate log; pub use deno_crypto; pub use deno_fetch; pub use deno_web; +pub use deno_websocket; pub mod colors; pub mod errors; diff --git a/runtime/ops/websocket.rs b/runtime/ops/websocket.rs index b220655ae6..3ecdae0888 100644 --- a/runtime/ops/websocket.rs +++ b/runtime/ops/websocket.rs @@ -1,330 +1,33 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - use crate::permissions::Permissions; -use deno_core::error::bad_resource_id; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::futures::stream::SplitSink; -use deno_core::futures::stream::SplitStream; -use deno_core::futures::SinkExt; -use deno_core::futures::StreamExt; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::url; -use deno_core::AsyncRefCell; -use deno_core::BufVec; -use deno_core::CancelFuture; -use deno_core::CancelHandle; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::{serde_json, ZeroCopyBuf}; -use http::{Method, Request, Uri}; -use serde::Deserialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::io::BufReader; -use std::io::Cursor; -use std::rc::Rc; -use std::sync::Arc; -use tokio::net::TcpStream; -use tokio_rustls::{rustls::ClientConfig, TlsConnector}; -use tokio_tungstenite::stream::Stream as StreamSwitcher; -use tokio_tungstenite::tungstenite::Error as TungsteniteError; -use tokio_tungstenite::tungstenite::{ - handshake::client::Response, protocol::frame::coding::CloseCode, - protocol::CloseFrame, Message, -}; -use tokio_tungstenite::{client_async, WebSocketStream}; -use webpki::DNSNameRef; - -#[derive(Clone)] -struct WsCaData(Vec); -#[derive(Clone)] -struct WsUserAgent(String); +use deno_websocket::op_ws_check_permission; +use deno_websocket::op_ws_close; +use deno_websocket::op_ws_create; +use deno_websocket::op_ws_next_event; +use deno_websocket::op_ws_send; +use deno_websocket::WsCaData; +use deno_websocket::WsUserAgent; pub fn init( rt: &mut deno_core::JsRuntime, - ca_data: Option>, user_agent: String, + ca_data: Option>, ) { { let op_state = rt.op_state(); let mut state = op_state.borrow_mut(); + state.put::(WsUserAgent(user_agent)); if let Some(ca_data) = ca_data { state.put::(WsCaData(ca_data)); } - state.put::(WsUserAgent(user_agent)); } - super::reg_json_sync(rt, "op_ws_check_permission", op_ws_check_permission); - super::reg_json_async(rt, "op_ws_create", op_ws_create); + super::reg_json_sync( + rt, + "op_ws_check_permission", + op_ws_check_permission::, + ); + super::reg_json_async(rt, "op_ws_create", op_ws_create::); super::reg_json_async(rt, "op_ws_send", op_ws_send); super::reg_json_async(rt, "op_ws_close", op_ws_close); super::reg_json_async(rt, "op_ws_next_event", op_ws_next_event); } - -type MaybeTlsStream = - StreamSwitcher>; - -type WsStream = WebSocketStream; -struct WsStreamResource { - tx: AsyncRefCell>, - rx: AsyncRefCell>, - // When a `WsStreamResource` resource is closed, all pending 'read' ops are - // canceled, while 'write' ops are allowed to complete. Therefore only - // 'read' futures are attached to this cancel handle. - cancel: CancelHandle, -} - -impl Resource for WsStreamResource { - fn name(&self) -> Cow { - "webSocketStream".into() - } -} - -impl WsStreamResource {} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CheckPermissionArgs { - url: String, -} - -// This op is needed because creating a WS instance in JavaScript is a sync -// operation and should throw error when permissions are not fullfiled, -// but actual op that connects WS is async. -pub fn op_ws_check_permission( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result { - let args: CheckPermissionArgs = serde_json::from_value(args)?; - - state - .borrow::() - .check_net_url(&url::Url::parse(&args.url)?)?; - - Ok(json!({})) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CreateArgs { - url: String, - protocols: String, -} - -pub async fn op_ws_create( - state: Rc>, - args: Value, - _bufs: BufVec, -) -> Result { - let args: CreateArgs = serde_json::from_value(args)?; - - { - let s = state.borrow(); - s.borrow::() - .check_net_url(&url::Url::parse(&args.url)?) - .expect( - "Permission check should have been done in op_ws_check_permission", - ); - } - - let ws_ca_data = state.borrow().try_borrow::().cloned(); - let user_agent = state.borrow().borrow::().0.clone(); - let uri: Uri = args.url.parse()?; - let mut request = Request::builder().method(Method::GET).uri(&uri); - - request = request.header("User-Agent", user_agent); - - if !args.protocols.is_empty() { - request = request.header("Sec-WebSocket-Protocol", args.protocols); - } - - let request = request.body(())?; - let domain = &uri.host().unwrap().to_string(); - let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { - Some("wss") => 443, - Some("ws") => 80, - _ => unreachable!(), - }); - let addr = format!("{}:{}", domain, port); - let try_socket = TcpStream::connect(addr).await; - let tcp_socket = match try_socket.map_err(TungsteniteError::Io) { - Ok(socket) => socket, - Err(_) => return Ok(json!({"success": false})), - }; - - let socket: MaybeTlsStream = match uri.scheme_str() { - Some("ws") => StreamSwitcher::Plain(tcp_socket), - Some("wss") => { - let mut config = ClientConfig::new(); - config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - - if let Some(ws_ca_data) = ws_ca_data { - let reader = &mut BufReader::new(Cursor::new(ws_ca_data.0)); - config.root_store.add_pem_file(reader).unwrap(); - } - - let tls_connector = TlsConnector::from(Arc::new(config)); - let dnsname = - DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); - let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; - StreamSwitcher::Tls(tls_socket) - } - _ => unreachable!(), - }; - - let (stream, response): (WsStream, Response) = - client_async(request, socket).await.map_err(|err| { - type_error(format!( - "failed to connect to WebSocket: {}", - err.to_string() - )) - })?; - - let (ws_tx, ws_rx) = stream.split(); - let resource = WsStreamResource { - rx: AsyncRefCell::new(ws_rx), - tx: AsyncRefCell::new(ws_tx), - cancel: Default::default(), - }; - let mut state = state.borrow_mut(); - let rid = state.resource_table.add(resource); - - let protocol = match response.headers().get("Sec-WebSocket-Protocol") { - Some(header) => header.to_str().unwrap(), - None => "", - }; - let extensions = response - .headers() - .get_all("Sec-WebSocket-Extensions") - .iter() - .map(|header| header.to_str().unwrap()) - .collect::(); - Ok(json!({ - "success": true, - "rid": rid, - "protocol": protocol, - "extensions": extensions - })) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct SendArgs { - rid: u32, - kind: String, - text: Option, -} - -pub async fn op_ws_send( - state: Rc>, - args: Value, - bufs: BufVec, -) -> Result { - let args: SendArgs = serde_json::from_value(args)?; - - let msg = match args.kind.as_str() { - "text" => Message::Text(args.text.unwrap()), - "binary" => Message::Binary(bufs[0].to_vec()), - "pong" => Message::Pong(vec![]), - _ => unreachable!(), - }; - let rid = args.rid; - - let resource = state - .borrow_mut() - .resource_table - .get::(rid) - .ok_or_else(bad_resource_id)?; - let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; - tx.send(msg).await?; - Ok(json!({})) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CloseArgs { - rid: u32, - code: Option, - reason: Option, -} - -pub async fn op_ws_close( - state: Rc>, - args: Value, - _bufs: BufVec, -) -> Result { - let args: CloseArgs = serde_json::from_value(args)?; - let rid = args.rid; - let msg = Message::Close(args.code.map(|c| CloseFrame { - code: CloseCode::from(c), - reason: match args.reason { - Some(reason) => Cow::from(reason), - None => Default::default(), - }, - })); - - let resource = state - .borrow_mut() - .resource_table - .get::(rid) - .ok_or_else(bad_resource_id)?; - let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; - tx.send(msg).await?; - Ok(json!({})) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct NextEventArgs { - rid: u32, -} - -pub async fn op_ws_next_event( - state: Rc>, - args: Value, - _bufs: BufVec, -) -> Result { - let args: NextEventArgs = serde_json::from_value(args)?; - - let resource = state - .borrow_mut() - .resource_table - .get::(args.rid) - .ok_or_else(bad_resource_id)?; - - let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - let val = rx.next().or_cancel(cancel).await?; - let res = match val { - Some(Ok(Message::Text(text))) => json!({ - "type": "string", - "data": text - }), - Some(Ok(Message::Binary(data))) => { - // TODO(ry): don't use json to send binary data. - json!({ - "type": "binary", - "data": data - }) - } - Some(Ok(Message::Close(Some(frame)))) => json!({ - "type": "close", - "code": u16::from(frame.code), - "reason": frame.reason.as_ref() - }), - Some(Ok(Message::Close(None))) => json!({ "type": "close" }), - Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), - Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), - Some(Err(_)) => json!({"type": "error"}), - None => { - state.borrow_mut().resource_table.close(args.rid).unwrap(); - json!({"type": "closed"}) - } - }; - Ok(res) -} diff --git a/runtime/permissions.rs b/runtime/permissions.rs index 70d243cb08..c50783f9d0 100644 --- a/runtime/permissions.rs +++ b/runtime/permissions.rs @@ -628,6 +628,12 @@ impl deno_fetch::FetchPermissions for Permissions { } } +impl deno_websocket::WebSocketPermissions for Permissions { + fn check_net_url(&self, url: &url::Url) -> Result<(), AnyError> { + Permissions::check_net_url(self, url) + } +} + /// Shows the permission prompt and returns the answer according to the user input. /// This loops until the user gives the proper input. #[cfg(not(test))] diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 988845840f..313c711773 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -237,8 +237,8 @@ impl WebWorker { ops::io::init(js_runtime); ops::websocket::init( js_runtime, - options.ca_data.clone(), options.user_agent.clone(), + options.ca_data.clone(), ); if options.use_deno_namespace { diff --git a/runtime/worker.rs b/runtime/worker.rs index a05c9f758c..9326d632e2 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -143,8 +143,8 @@ impl MainWorker { ops::tty::init(js_runtime); ops::websocket::init( js_runtime, - options.ca_data.clone(), options.user_agent.clone(), + options.ca_data.clone(), ); } {