diff --git a/Cargo.lock b/Cargo.lock index 5b1f429996..2a21a541ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1274,6 +1274,7 @@ version = "0.102.0" dependencies = [ "deno_core", "deno_tls", + "fastwebsockets", "http", "hyper", "serde", @@ -1679,6 +1680,18 @@ dependencies = [ "instant", ] +[[package]] +name = "fastwebsockets" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45dfdedde2bd984f677056a9a804fe995990ab4f4594599c848c05a10ee8c05e" +dependencies = [ + "cc", + "simdutf8", + "tokio", + "utf-8", +] + [[package]] name = "fd-lock" version = "3.0.10" @@ -3913,6 +3926,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simdutf8" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" + [[package]] name = "siphasher" version = "0.3.10" diff --git a/ext/http/01_http.js b/ext/http/01_http.js index d0aefc0c32..51347ebede 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -26,8 +26,10 @@ import { _protocol, _readyState, _rid, + _role, _server, _serverHandleIdleTimeout, + SERVER, WebSocket, } from "ext:deno_websocket/01_websocket.js"; import { listen, TcpConn, UnixConn } from "ext:deno_net/01_net.js"; @@ -376,6 +378,7 @@ function createRespondWith( httpConn.close(); ws[_readyState] = WebSocket.OPEN; + ws[_role] = SERVER; const event = new Event("open"); ws.dispatchEvent(event); diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js index 5105df24d7..2b8ee59a91 100644 --- a/ext/websocket/01_websocket.js +++ b/ext/websocket/01_websocket.js @@ -78,6 +78,11 @@ webidl.converters["WebSocketSend"] = (V, opts) => { return webidl.converters["USVString"](V, opts); }; +/** role */ +const SERVER = 0; +const CLIENT = 1; + +/** state */ const CONNECTING = 0; const OPEN = 1; const CLOSING = 2; @@ -86,6 +91,7 @@ const CLOSED = 3; const _readyState = Symbol("[[readyState]]"); const _url = Symbol("[[url]]"); const _rid = Symbol("[[rid]]"); +const _role = Symbol("[[role]]"); const _extensions = Symbol("[[extensions]]"); const _protocol = Symbol("[[protocol]]"); const _binaryType = Symbol("[[binaryType]]"); @@ -98,6 +104,7 @@ const _idleTimeoutTimeout = Symbol("[[idleTimeoutTimeout]]"); const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]"); class WebSocket extends EventTarget { [_rid]; + [_role]; [_readyState] = CONNECTING; get readyState() { @@ -203,6 +210,7 @@ class WebSocket extends EventTarget { } this[_url] = wsURL.href; + this[_role] = CLIENT; ops.op_ws_check_permission_and_cancel_handle( "WebSocket.abort()", @@ -312,7 +320,13 @@ class WebSocket extends EventTarget { const sendTypedArray = (view, byteLength) => { this[_bufferedAmount] += byteLength; PromisePrototypeThen( - core.opAsync2("op_ws_send_binary", this[_rid], view), + core.opAsync2( + this[_role] === SERVER + ? "op_server_ws_send_binary" + : "op_ws_send_binary", + this[_rid], + view, + ), () => { this[_bufferedAmount] -= byteLength; }, @@ -346,7 +360,11 @@ class WebSocket extends EventTarget { const d = core.encode(string); this[_bufferedAmount] += TypedArrayPrototypeGetByteLength(d); PromisePrototypeThen( - core.opAsync2("op_ws_send_text", this[_rid], string), + core.opAsync2( + this[_role] === SERVER ? "op_server_ws_send_text" : "op_ws_send_text", + this[_rid], + string, + ), () => { this[_bufferedAmount] -= TypedArrayPrototypeGetByteLength(d); }, @@ -401,7 +419,12 @@ class WebSocket extends EventTarget { this[_readyState] = CLOSING; PromisePrototypeCatch( - core.opAsync("op_ws_close", this[_rid], code, reason), + core.opAsync( + this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close", + this[_rid], + code, + reason, + ), (err) => { this[_readyState] = CLOSED; @@ -422,7 +445,7 @@ class WebSocket extends EventTarget { async [_eventLoop]() { while (this[_readyState] !== CLOSED) { const { 0: kind, 1: value } = await core.opAsync2( - "op_ws_next_event", + this[_role] === SERVER ? "op_server_ws_next_event" : "op_ws_next_event", this[_rid], ); @@ -489,7 +512,7 @@ class WebSocket extends EventTarget { if (prevState === OPEN) { try { await core.opAsync( - "op_ws_close", + this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close", this[_rid], code, value, @@ -517,14 +540,23 @@ class WebSocket extends EventTarget { clearTimeout(this[_idleTimeoutTimeout]); this[_idleTimeoutTimeout] = setTimeout(async () => { if (this[_readyState] === OPEN) { - await core.opAsync("op_ws_send", this[_rid], { - kind: "ping", - }); + await core.opAsync( + this[_role] === SERVER ? "op_server_ws_send" : "op_ws_send", + this[_rid], + { + kind: "ping", + }, + ); this[_idleTimeoutTimeout] = setTimeout(async () => { if (this[_readyState] === OPEN) { this[_readyState] = CLOSING; const reason = "No response from ping frame."; - await core.opAsync("op_ws_close", this[_rid], 1001, reason); + await core.opAsync( + this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close", + this[_rid], + 1001, + reason, + ); this[_readyState] = CLOSED; const errEvent = new ErrorEvent("error", { @@ -594,7 +626,9 @@ export { _protocol, _readyState, _rid, + _role, _server, _serverHandleIdleTimeout, + SERVER, WebSocket, }; diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml index 0432c78f8d..cf77cccf3a 100644 --- a/ext/websocket/Cargo.toml +++ b/ext/websocket/Cargo.toml @@ -16,6 +16,7 @@ path = "lib.rs" [dependencies] deno_core.workspace = true deno_tls.workspace = true +fastwebsockets = "0.1.0" http.workspace = true hyper.workspace = true serde.workspace = true diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 1c586b383f..71f176070a 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -47,13 +47,16 @@ use tokio_tungstenite::tungstenite::handshake::client::Response; use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use tokio_tungstenite::tungstenite::protocol::CloseFrame; use tokio_tungstenite::tungstenite::protocol::Message; -use tokio_tungstenite::tungstenite::protocol::Role; use tokio_tungstenite::tungstenite::protocol::WebSocketConfig; use tokio_tungstenite::MaybeTlsStream; use tokio_tungstenite::WebSocketStream; pub use tokio_tungstenite; // Re-export tokio_tungstenite +mod server; + +pub use server::ws_create_server_stream; + #[derive(Clone)] pub struct WsRootStore(pub Option); #[derive(Clone)] @@ -89,35 +92,6 @@ pub enum WebSocketStreamType { pub trait Upgraded: AsyncRead + AsyncWrite + Unpin {} -pub async fn ws_create_server_stream( - state: &Rc>, - transport: Pin>, -) -> Result { - let ws_stream = WebSocketStream::from_raw_socket( - transport, - Role::Server, - Some(WebSocketConfig { - max_message_size: Some(128 << 20), - max_frame_size: Some(32 << 20), - ..Default::default() - }), - ) - .await; - let (ws_tx, ws_rx) = ws_stream.split(); - - let ws_resource = WsStreamResource { - stream: WebSocketStreamType::Server { - tx: AsyncRefCell::new(ws_tx), - rx: AsyncRefCell::new(ws_rx), - }, - cancel: Default::default(), - }; - - let resource_table = &mut state.borrow_mut().resource_table; - let rid = resource_table.add(ws_resource); - Ok(rid) -} - pub struct WsStreamResource { pub stream: WebSocketStreamType, // When a `WsStreamResource` resource is closed, all pending 'read' ops are @@ -549,6 +523,11 @@ deno_core::extension!(deno_websocket, op_ws_next_event, op_ws_send_binary, op_ws_send_text, + server::op_server_ws_send, + server::op_server_ws_close, + server::op_server_ws_next_event, + server::op_server_ws_send_binary, + server::op_server_ws_send_text, ], esm = [ "01_websocket.js", "02_websocketstream.js" ], options = { diff --git a/ext/websocket/server.rs b/ext/websocket/server.rs new file mode 100644 index 0000000000..4b47c88c80 --- /dev/null +++ b/ext/websocket/server.rs @@ -0,0 +1,191 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use crate::MessageKind; +use crate::SendValue; +use crate::Upgraded; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::op; +use deno_core::AsyncRefCell; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::StringOrBuffer; +use deno_core::ZeroCopyBuf; +use std::borrow::Cow; +use std::cell::RefCell; +use std::pin::Pin; +use std::rc::Rc; + +use fastwebsockets::CloseCode; +use fastwebsockets::FragmentCollector; +use fastwebsockets::Frame; +use fastwebsockets::OpCode; +use fastwebsockets::WebSocket; + +pub struct ServerWebSocket { + ws: AsyncRefCell>>>, +} + +impl Resource for ServerWebSocket { + fn name(&self) -> Cow { + "serverWebSocket".into() + } +} +pub async fn ws_create_server_stream( + state: &Rc>, + transport: Pin>, +) -> Result { + let mut ws = WebSocket::after_handshake(transport); + ws.set_writev(false); + ws.set_auto_close(true); + ws.set_auto_pong(true); + + let ws_resource = ServerWebSocket { + ws: AsyncRefCell::new(FragmentCollector::new(ws)), + }; + + let resource_table = &mut state.borrow_mut().resource_table; + let rid = resource_table.add(ws_resource); + Ok(rid) +} + +#[op] +pub async fn op_server_ws_send_binary( + state: Rc>, + rid: ResourceId, + data: ZeroCopyBuf, +) -> Result<(), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .get::(rid)?; + + let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; + ws.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec())) + .await + .map_err(|err| type_error(err.to_string()))?; + Ok(()) +} + +#[op] +pub async fn op_server_ws_send_text( + state: Rc>, + rid: ResourceId, + data: String, +) -> Result<(), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .get::(rid)?; + let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; + ws.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) + .await + .map_err(|err| type_error(err.to_string()))?; + Ok(()) +} + +#[op] +pub async fn op_server_ws_send( + state: Rc>, + rid: ResourceId, + value: SendValue, +) -> Result<(), AnyError> { + let msg = match value { + SendValue::Text(text) => { + Frame::new(true, OpCode::Text, None, text.into_bytes()) + } + SendValue::Binary(buf) => { + Frame::new(true, OpCode::Binary, None, buf.to_vec()) + } + SendValue::Pong => Frame::new(true, OpCode::Pong, None, vec![]), + SendValue::Ping => Frame::new(true, OpCode::Ping, None, vec![]), + }; + + let resource = state + .borrow_mut() + .resource_table + .get::(rid)?; + let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; + + ws.write_frame(msg) + .await + .map_err(|err| type_error(err.to_string()))?; + Ok(()) +} + +#[op(deferred)] +pub async fn op_server_ws_close( + state: Rc>, + rid: ResourceId, + code: Option, + reason: Option, +) -> Result<(), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .get::(rid)?; + let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; + let frame = reason + .map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes())) + .unwrap_or_else(|| Frame::close_raw(vec![])); + ws.write_frame(frame) + .await + .map_err(|err| type_error(err.to_string()))?; + Ok(()) +} + +#[op] +pub async fn op_server_ws_next_event( + state: Rc>, + rid: ResourceId, +) -> Result<(u16, StringOrBuffer), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .get::(rid)?; + let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; + let val = match ws.read_frame().await { + Ok(val) => val, + Err(err) => { + return Ok(( + MessageKind::Error as u16, + StringOrBuffer::String(err.to_string()), + )) + } + }; + + let res = match val.opcode { + OpCode::Text => ( + MessageKind::Text as u16, + StringOrBuffer::String(String::from_utf8(val.payload).unwrap()), + ), + OpCode::Binary => ( + MessageKind::Binary as u16, + StringOrBuffer::Buffer(val.payload.into()), + ), + OpCode::Close => { + if val.payload.len() < 2 { + return Ok((1005, StringOrBuffer::String("".to_string()))); + } + + let close_code = + CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]])); + let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap(); + (close_code.into(), StringOrBuffer::String(reason)) + } + OpCode::Ping => ( + MessageKind::Ping as u16, + StringOrBuffer::Buffer(vec![].into()), + ), + OpCode::Pong => ( + MessageKind::Pong as u16, + StringOrBuffer::Buffer(vec![].into()), + ), + OpCode::Continuation => { + return Err(type_error("Unexpected continuation frame")) + } + }; + Ok(res) +}