mirror of
https://github.com/denoland/deno.git
synced 2024-11-28 16:20:57 -05:00
fix(ext/websocket): Avoid write deadlock that requires read_frame to complete (#18705)
Fixes https://github.com/denoland/deno/issues/18700 Timeline of the events that lead to the bug. 1. WebSocket handshake complete 2. Server on `read_frame` holding an AsyncRefCell borrow of the WebSocket stream. 3. Client sends a TXT frame after a some time 4. Server recieves the frame and goes back to `read_frame`. 5. After some time, Server starts a `write_frame` but `read_frame` is still holding a borrow! ^--- Locked. read_frame needs to complete so we can resume the write. This commit changes all writes to directly borrow the `fastwebsocket::WebSocket` resource under the assumption that it won't affect ongoing reads.
This commit is contained in:
parent
ed3b5db1b8
commit
001a9a5856
2 changed files with 69 additions and 20 deletions
|
@ -36,3 +36,49 @@ Deno.test(async function websocketPingPong() {
|
||||||
await promise;
|
await promise;
|
||||||
ws.close();
|
ws.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// https://github.com/denoland/deno/issues/18700
|
||||||
|
Deno.test(
|
||||||
|
{ sanitizeOps: false, sanitizeResources: false },
|
||||||
|
async function websocketWriteLock() {
|
||||||
|
const ac = new AbortController();
|
||||||
|
const listeningPromise = deferred();
|
||||||
|
|
||||||
|
const server = Deno.serve({
|
||||||
|
handler: (req) => {
|
||||||
|
const { socket, response } = Deno.upgradeWebSocket(req);
|
||||||
|
socket.onopen = function () {
|
||||||
|
setTimeout(() => socket.send("Hello"), 500);
|
||||||
|
};
|
||||||
|
socket.onmessage = function (e) {
|
||||||
|
assertEquals(e.data, "Hello");
|
||||||
|
ac.abort();
|
||||||
|
};
|
||||||
|
return response;
|
||||||
|
},
|
||||||
|
signal: ac.signal,
|
||||||
|
onListen: () => listeningPromise.resolve(),
|
||||||
|
hostname: "localhost",
|
||||||
|
port: 4246,
|
||||||
|
});
|
||||||
|
|
||||||
|
await listeningPromise;
|
||||||
|
const promise = deferred();
|
||||||
|
const ws = new WebSocket("ws://localhost:4246/");
|
||||||
|
assertEquals(ws.url, "ws://localhost:4246/");
|
||||||
|
ws.onerror = () => fail();
|
||||||
|
ws.onmessage = (e) => {
|
||||||
|
assertEquals(e.data, "Hello");
|
||||||
|
setTimeout(() => {
|
||||||
|
ws.send(e.data);
|
||||||
|
}, 1000);
|
||||||
|
promise.resolve();
|
||||||
|
};
|
||||||
|
ws.onclose = () => {
|
||||||
|
promise.resolve();
|
||||||
|
};
|
||||||
|
|
||||||
|
await Promise.all([promise, server]);
|
||||||
|
ws.close();
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
|
@ -28,6 +28,23 @@ pub struct ServerWebSocket {
|
||||||
ws: AsyncRefCell<FragmentCollector<Pin<Box<dyn Upgraded>>>>,
|
ws: AsyncRefCell<FragmentCollector<Pin<Box<dyn Upgraded>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ServerWebSocket {
|
||||||
|
#[inline]
|
||||||
|
pub async fn write_frame(
|
||||||
|
self: Rc<Self>,
|
||||||
|
frame: Frame,
|
||||||
|
) -> Result<(), AnyError> {
|
||||||
|
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
|
||||||
|
// to populate the write buffer. We encounter an await point when writing
|
||||||
|
// to the socket after the frame has already been written to the buffer.
|
||||||
|
let ws = unsafe { &mut *self.ws.as_ptr() };
|
||||||
|
ws.write_frame(frame)
|
||||||
|
.await
|
||||||
|
.map_err(|err| type_error(err.to_string()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Resource for ServerWebSocket {
|
impl Resource for ServerWebSocket {
|
||||||
fn name(&self) -> Cow<str> {
|
fn name(&self) -> Cow<str> {
|
||||||
"serverWebSocket".into()
|
"serverWebSocket".into()
|
||||||
|
@ -61,12 +78,9 @@ pub async fn op_server_ws_send_binary(
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
.get::<ServerWebSocket>(rid)?;
|
.get::<ServerWebSocket>(rid)?;
|
||||||
|
resource
|
||||||
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
|
||||||
ws.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
|
|
||||||
.await
|
.await
|
||||||
.map_err(|err| type_error(err.to_string()))?;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op]
|
||||||
|
@ -79,11 +93,9 @@ pub async fn op_server_ws_send_text(
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
.get::<ServerWebSocket>(rid)?;
|
.get::<ServerWebSocket>(rid)?;
|
||||||
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
resource
|
||||||
ws.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
|
.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
|
||||||
.await
|
.await
|
||||||
.map_err(|err| type_error(err.to_string()))?;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
#[op]
|
||||||
|
@ -107,12 +119,7 @@ pub async fn op_server_ws_send(
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
.get::<ServerWebSocket>(rid)?;
|
.get::<ServerWebSocket>(rid)?;
|
||||||
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
resource.write_frame(msg).await
|
||||||
|
|
||||||
ws.write_frame(msg)
|
|
||||||
.await
|
|
||||||
.map_err(|err| type_error(err.to_string()))?;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op(deferred)]
|
#[op(deferred)]
|
||||||
|
@ -126,14 +133,10 @@ pub async fn op_server_ws_close(
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.resource_table
|
.resource_table
|
||||||
.get::<ServerWebSocket>(rid)?;
|
.get::<ServerWebSocket>(rid)?;
|
||||||
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
|
||||||
let frame = reason
|
let frame = reason
|
||||||
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
|
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
|
||||||
.unwrap_or_else(|| Frame::close_raw(vec![]));
|
.unwrap_or_else(|| Frame::close_raw(vec![]));
|
||||||
ws.write_frame(frame)
|
resource.write_frame(frame).await
|
||||||
.await
|
|
||||||
.map_err(|err| type_error(err.to_string()))?;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op(deferred)]
|
#[op(deferred)]
|
||||||
|
|
Loading…
Reference in a new issue