diff --git a/cli/tests/unit/websocket_test.ts b/cli/tests/unit/websocket_test.ts index 16384da400..ac33f9d692 100644 --- a/cli/tests/unit/websocket_test.ts +++ b/cli/tests/unit/websocket_test.ts @@ -100,7 +100,7 @@ Deno.test( promise.resolve(); }; - await Promise.all([promise, server]); + await Promise.all([promise, server.finished]); ws.close(); }, ); @@ -145,7 +145,57 @@ Deno.test({ ws.onclose = () => { promise.resolve(); }; - await Promise.all([promise, server]); + await Promise.all([promise, server.finished]); +}); + +// https://github.com/denoland/deno/issues/19483 +Deno.test({ + sanitizeOps: false, + sanitizeResources: false, +}, async function websocketCloseFlushes() { + const promise = deferred(); + + const ac = new AbortController(); + const listeningPromise = deferred(); + + const server = Deno.serve({ + handler: (req) => { + const { response, socket } = Deno.upgradeWebSocket(req); + socket.onopen = () => socket.send("Hello"); + socket.onmessage = () => { + socket.send("Bye"); + socket.close(); + }; + socket.onclose = () => ac.abort(); + socket.onerror = () => fail(); + return response; + }, + signal: ac.signal, + onListen: () => listeningPromise.resolve(), + hostname: "localhost", + port: 4247, + }); + + await listeningPromise; + + const ws = new WebSocket("ws://localhost:4247/"); + assertEquals(ws.url, "ws://localhost:4247/"); + let seenBye = false; + ws.onerror = () => fail(); + ws.onmessage = ({ data }) => { + if (data == "Hello") { + ws.send("Hello!"); + } else { + assertEquals(data, "Bye"); + seenBye = true; + } + }; + ws.onclose = () => { + promise.resolve(); + }; + await Promise.all([promise, server.finished]); + + assert(seenBye); }); Deno.test( diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index bcbec2c5ed..cbf9f8ff17 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -6,6 +6,7 @@ use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op; use deno_core::url; +use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; @@ -354,12 +355,19 @@ impl ServerWebSocket { } } + /// Reserve a lock, but don't wait on it. This gets us our place in line. + pub fn reserve_lock(self: &Rc) -> AsyncMutFuture<()> { + RcRef::map(self, |r| &r.tx_lock).borrow_mut() + } + #[inline] pub async fn write_frame( self: &Rc, + lock: AsyncMutFuture<()>, frame: Frame, ) -> Result<(), AnyError> { - let _lock = RcRef::map(self, |r| &r.tx_lock).borrow_mut().await; + lock.await; + // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket // to populate the write buffer. We encounter an await point when writing // to the socket after the frame has already been written to the buffer. @@ -407,9 +415,10 @@ pub fn op_ws_send_binary( let data = data.to_vec(); let len = data.len(); resource.buffered.set(resource.buffered.get() + len); + let lock = resource.reserve_lock(); deno_core::task::spawn(async move { if let Err(err) = resource - .write_frame(Frame::new(true, OpCode::Binary, None, data)) + .write_frame(lock, Frame::new(true, OpCode::Binary, None, data)) .await { resource.set_error(Some(err.to_string())); @@ -424,9 +433,13 @@ pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) { let resource = state.resource_table.get::(rid).unwrap(); let len = data.len(); resource.buffered.set(resource.buffered.get() + len); + let lock = resource.reserve_lock(); deno_core::task::spawn(async move { if let Err(err) = resource - .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) + .write_frame( + lock, + Frame::new(true, OpCode::Text, None, data.into_bytes()), + ) .await { resource.set_error(Some(err.to_string())); @@ -448,8 +461,9 @@ pub async fn op_ws_send_binary_async( .resource_table .get::(rid)?; let data = data.to_vec(); + let lock = resource.reserve_lock(); resource - .write_frame(Frame::new(true, OpCode::Binary, None, data)) + .write_frame(lock, Frame::new(true, OpCode::Binary, None, data)) .await } @@ -464,8 +478,12 @@ pub async fn op_ws_send_text_async( .borrow_mut() .resource_table .get::(rid)?; + let lock = resource.reserve_lock(); resource - .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) + .write_frame( + lock, + Frame::new(true, OpCode::Text, None, data.into_bytes()), + ) .await } @@ -488,7 +506,8 @@ pub async fn op_ws_send_pong( .borrow_mut() .resource_table .get::(rid)?; - resource.write_frame(Frame::pong(vec![])).await + let lock = resource.reserve_lock(); + resource.write_frame(lock, Frame::pong(vec![])).await } #[op] @@ -500,8 +519,9 @@ pub async fn op_ws_send_ping( .borrow_mut() .resource_table .get::(rid)?; + let lock = resource.reserve_lock(); resource - .write_frame(Frame::new(true, OpCode::Ping, None, vec![])) + .write_frame(lock, Frame::new(true, OpCode::Ping, None, vec![])) .await } @@ -521,7 +541,8 @@ pub async fn op_ws_close( .unwrap_or_else(|| Frame::close_raw(vec![])); resource.closed.set(true); - resource.write_frame(frame).await?; + let lock = resource.reserve_lock(); + resource.write_frame(lock, frame).await?; Ok(()) }