diff --git a/Cargo.lock b/Cargo.lock index ea7e774f34..d5c1f5558b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2259,9 +2259,9 @@ checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "fastwebsockets" -version = "0.4.4" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e6185b6dc9dddc4db0dedd2e213047e93bcbf7a0fb092abc4c4e4f3195efdb4" +checksum = "17c35f166afb94b7f8e9449d0ad866daca111ba4053f3b1960bb480ca4382c63" dependencies = [ "base64 0.21.4", "hyper 0.14.27", diff --git a/Cargo.toml b/Cargo.toml index c2765ab660..e859641dd9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,7 @@ data-encoding = "2.3.3" dlopen = "0.1.8" encoding_rs = "=0.8.33" ecb = "=0.1.2" -fastwebsockets = "=0.4.4" +fastwebsockets = "=0.5.0" filetime = "0.2.16" flate2 = { version = "1.0.26", features = ["zlib-ng"], default-features = false } fs3 = "0.5.0" diff --git a/cli/tests/integration/js_unit_tests.rs b/cli/tests/integration/js_unit_tests.rs index a13db02d7a..f110f8aa6c 100644 --- a/cli/tests/integration/js_unit_tests.rs +++ b/cli/tests/integration/js_unit_tests.rs @@ -112,12 +112,22 @@ util::unit_test_factory!( fn js_unit_test(test: String) { let _g = util::http_server(); - let mut deno = util::deno_cmd() + let mut deno = util::deno_cmd(); + let deno = deno .current_dir(util::root_path()) .arg("test") .arg("--unstable") .arg("--location=http://js-unit-tests/foo/bar") - .arg("--no-prompt") + .arg("--no-prompt"); + + // TODO(mmastrac): it would be better to just load a test CA for all tests + let deno = if test == "websocket_test" { + deno.arg("--unsafely-ignore-certificate-errors") + } else { + deno + }; + + let mut deno = deno .arg("-A") .arg(util::tests_path().join("unit").join(format!("{test}.ts"))) .stderr(Stdio::piped()) diff --git a/cli/tests/unit/websocket_test.ts b/cli/tests/unit/websocket_test.ts index 11f0fd7dc3..b761cd1183 100644 --- a/cli/tests/unit/websocket_test.ts +++ b/cli/tests/unit/websocket_test.ts @@ -21,7 +21,7 @@ Deno.test(async function websocketConstructorTakeURLObjectAsParameter() { const promise = deferred(); const ws = new WebSocket(new URL("ws://localhost:4242/")); assertEquals(ws.url, "ws://localhost:4242/"); - ws.onerror = () => fail(); + ws.onerror = (e) => promise.reject(e); ws.onopen = () => ws.close(); ws.onclose = () => { promise.resolve(); @@ -29,13 +29,66 @@ Deno.test(async function websocketConstructorTakeURLObjectAsParameter() { await promise; }); +Deno.test(async function websocketSendLargePacket() { + const promise = deferred(); + const ws = new WebSocket(new URL("wss://localhost:4243/")); + assertEquals(ws.url, "wss://localhost:4243/"); + ws.onerror = (e) => promise.reject(e); + ws.onopen = () => { + ws.send("a".repeat(65000)); + }; + ws.onmessage = () => { + ws.close(); + }; + ws.onclose = () => { + promise.resolve(); + }; + await promise; +}); + +Deno.test(async function websocketSendLargeBinaryPacket() { + const promise = deferred(); + const ws = new WebSocket(new URL("wss://localhost:4243/")); + assertEquals(ws.url, "wss://localhost:4243/"); + ws.onerror = (e) => promise.reject(e); + ws.onopen = () => { + ws.send(new Uint8Array(65000)); + }; + ws.onmessage = (msg) => { + console.log(msg); + ws.close(); + }; + ws.onclose = () => { + promise.resolve(); + }; + await promise; +}); + +Deno.test(async function websocketSendLargeBlobPacket() { + const promise = deferred(); + const ws = new WebSocket(new URL("wss://localhost:4243/")); + assertEquals(ws.url, "wss://localhost:4243/"); + ws.onerror = (e) => promise.reject(e); + ws.onopen = () => { + ws.send(new Blob(["a".repeat(65000)])); + }; + ws.onmessage = (msg) => { + console.log(msg); + ws.close(); + }; + ws.onclose = () => { + promise.resolve(); + }; + await promise; +}); + // https://github.com/denoland/deno/pull/17762 // https://github.com/denoland/deno/issues/17761 Deno.test(async function websocketPingPong() { const promise = deferred(); const ws = new WebSocket("ws://localhost:4245/"); assertEquals(ws.url, "ws://localhost:4245/"); - ws.onerror = () => fail(); + ws.onerror = (e) => promise.reject(e); ws.onmessage = (e) => { ws.send(e.data); }; @@ -144,7 +197,9 @@ Deno.test({ const ws = new WebSocket(serveUrl); assertEquals(ws.url, serveUrl); ws.onerror = () => fail(); - ws.onmessage = () => ws.send("bye"); + ws.onmessage = (m: MessageEvent) => { + if (m.data == "Hello") ws.send("bye"); + }; ws.onclose = () => { promise.resolve(); }; diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml index a144d01639..7dd7a9afee 100644 --- a/ext/websocket/Cargo.toml +++ b/ext/websocket/Cargo.toml @@ -18,7 +18,7 @@ bytes.workspace = true deno_core.workspace = true deno_net.workspace = true deno_tls.workspace = true -fastwebsockets = { workspace = true, features = ["upgrade"] } +fastwebsockets = { workspace = true, features = ["upgrade", "unstable-split"] } http.workspace = true hyper = { workspace = true, features = ["backports"] } once_cell.workspace = true diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 83d553eebe..0f3456eef2 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -41,17 +41,21 @@ use std::rc::Rc; use std::sync::Arc; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use tokio::io::ReadHalf; +use tokio::io::WriteHalf; use tokio::net::TcpStream; use tokio_rustls::rustls::RootCertStore; use tokio_rustls::rustls::ServerName; use tokio_rustls::TlsConnector; use fastwebsockets::CloseCode; -use fastwebsockets::FragmentCollector; +use fastwebsockets::FragmentCollectorRead; use fastwebsockets::Frame; use fastwebsockets::OpCode; use fastwebsockets::Role; use fastwebsockets::WebSocket; +use fastwebsockets::WebSocketWrite; + mod stream; static USE_WRITEV: Lazy = Lazy::new(|| { @@ -332,12 +336,13 @@ pub struct ServerWebSocket { closed: Cell, buffer: Cell>>, string: Cell>, - ws: AsyncRefCell>, - tx_lock: AsyncRefCell<()>, + ws_read: AsyncRefCell>>, + ws_write: AsyncRefCell>>, } impl ServerWebSocket { fn new(ws: WebSocket) -> Self { + let (ws_read, ws_write) = ws.split(tokio::io::split); Self { buffered: Cell::new(0), error: Cell::new(None), @@ -345,8 +350,8 @@ impl ServerWebSocket { closed: Cell::new(false), buffer: Cell::new(None), string: Cell::new(None), - ws: AsyncRefCell::new(FragmentCollector::new(ws)), - tx_lock: AsyncRefCell::new(()), + ws_read: AsyncRefCell::new(FragmentCollectorRead::new(ws_read)), + ws_write: AsyncRefCell::new(ws_write), } } @@ -361,22 +366,22 @@ impl ServerWebSocket { } /// Reserve a lock, but don't wait on it. This gets us our place in line. - pub fn reserve_lock(self: &Rc) -> AsyncMutFuture<()> { - RcRef::map(self, |r| &r.tx_lock).borrow_mut() + fn reserve_lock( + self: &Rc, + ) -> AsyncMutFuture>> { + RcRef::map(self, |r| &r.ws_write).borrow_mut() } #[inline] - pub async fn write_frame( + async fn write_frame( self: &Rc, - lock: AsyncMutFuture<()>, + lock: AsyncMutFuture>>, frame: Frame<'_>, ) -> Result<(), AnyError> { - lock.await; - - // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket - // to populate the write buffer. We encounter an await point when writing - // to the socket after the frame has already been written to the buffer. - let ws = unsafe { &mut *self.ws.as_ptr() }; + let mut ws = lock.await; + if ws.is_closed() { + return Ok(()); + } ws.write_frame(frame) .await .map_err(|err| type_error(err.to_string()))?; @@ -405,6 +410,7 @@ pub fn ws_create_server_stream( ws.set_writev(*USE_WRITEV); ws.set_auto_close(true); ws.set_auto_pong(true); + let rid = state.resource_table.add(ServerWebSocket::new(ws)); Ok(rid) } @@ -627,9 +633,15 @@ pub async fn op_ws_next_event( return MessageKind::Error as u16; } - let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; + let mut ws = RcRef::map(&resource, |r| &r.ws_read).borrow_mut().await; + let writer = RcRef::map(&resource, |r| &r.ws_write); + let mut sender = move |frame| { + let writer = writer.clone(); + async move { writer.borrow_mut().await.write_frame(frame).await } + }; loop { - let val = match ws.read_frame().await { + let res = ws.read_frame(&mut sender).await; + let val = match res { Ok(val) => val, Err(err) => { // No message was received, socket closed while we waited.