diff --git a/cli/tests/unit/websocket_test.ts b/cli/tests/unit/websocket_test.ts index 6737edc11f..948e2add23 100644 --- a/cli/tests/unit/websocket_test.ts +++ b/cli/tests/unit/websocket_test.ts @@ -36,3 +36,49 @@ Deno.test(async function websocketPingPong() { await promise; ws.close(); }); + +// https://github.com/denoland/deno/issues/18700 +Deno.test( + { sanitizeOps: false, sanitizeResources: false }, + async function websocketWriteLock() { + const ac = new AbortController(); + const listeningPromise = deferred(); + + const server = Deno.serve({ + handler: (req) => { + const { socket, response } = Deno.upgradeWebSocket(req); + socket.onopen = function () { + setTimeout(() => socket.send("Hello"), 500); + }; + socket.onmessage = function (e) { + assertEquals(e.data, "Hello"); + ac.abort(); + }; + return response; + }, + signal: ac.signal, + onListen: () => listeningPromise.resolve(), + hostname: "localhost", + port: 4246, + }); + + await listeningPromise; + const promise = deferred(); + const ws = new WebSocket("ws://localhost:4246/"); + assertEquals(ws.url, "ws://localhost:4246/"); + ws.onerror = () => fail(); + ws.onmessage = (e) => { + assertEquals(e.data, "Hello"); + setTimeout(() => { + ws.send(e.data); + }, 1000); + promise.resolve(); + }; + ws.onclose = () => { + promise.resolve(); + }; + + await Promise.all([promise, server]); + ws.close(); + }, +); diff --git a/ext/websocket/server.rs b/ext/websocket/server.rs index eb8737b192..44bc07e59b 100644 --- a/ext/websocket/server.rs +++ b/ext/websocket/server.rs @@ -28,6 +28,23 @@ pub struct ServerWebSocket { ws: AsyncRefCell>>>, } +impl ServerWebSocket { + #[inline] + pub async fn write_frame( + self: Rc, + frame: Frame, + ) -> Result<(), AnyError> { + // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket + // to populate the write buffer. We encounter an await point when writing + // to the socket after the frame has already been written to the buffer. + let ws = unsafe { &mut *self.ws.as_ptr() }; + ws.write_frame(frame) + .await + .map_err(|err| type_error(err.to_string()))?; + Ok(()) + } +} + impl Resource for ServerWebSocket { fn name(&self) -> Cow { "serverWebSocket".into() @@ -61,12 +78,9 @@ pub async fn op_server_ws_send_binary( .borrow_mut() .resource_table .get::(rid)?; - - let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; - ws.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec())) + resource + .write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec())) .await - .map_err(|err| type_error(err.to_string()))?; - Ok(()) } #[op] @@ -79,11 +93,9 @@ pub async fn op_server_ws_send_text( .borrow_mut() .resource_table .get::(rid)?; - let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; - ws.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) + resource + .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) .await - .map_err(|err| type_error(err.to_string()))?; - Ok(()) } #[op] @@ -107,12 +119,7 @@ pub async fn op_server_ws_send( .borrow_mut() .resource_table .get::(rid)?; - let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; - - ws.write_frame(msg) - .await - .map_err(|err| type_error(err.to_string()))?; - Ok(()) + resource.write_frame(msg).await } #[op(deferred)] @@ -126,14 +133,10 @@ pub async fn op_server_ws_close( .borrow_mut() .resource_table .get::(rid)?; - let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; let frame = reason .map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes())) .unwrap_or_else(|| Frame::close_raw(vec![])); - ws.write_frame(frame) - .await - .map_err(|err| type_error(err.to_string()))?; - Ok(()) + resource.write_frame(frame).await } #[op(deferred)]