1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-13 17:39:18 -05:00

fix(ext/websockets): ensure we fully send frames before close (#19484)

Fixes #19483
This commit is contained in:
Matt Mastracci 2023-06-13 11:16:17 -06:00 committed by GitHub
parent ceb03cfb03
commit 72da18dd47
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 81 additions and 10 deletions

View file

@ -100,7 +100,7 @@ Deno.test(
promise.resolve();
};
await Promise.all([promise, server]);
await Promise.all([promise, server.finished]);
ws.close();
},
);
@ -145,7 +145,57 @@ Deno.test({
ws.onclose = () => {
promise.resolve();
};
await Promise.all([promise, server]);
await Promise.all([promise, server.finished]);
});
// https://github.com/denoland/deno/issues/19483
Deno.test({
sanitizeOps: false,
sanitizeResources: false,
}, async function websocketCloseFlushes() {
const promise = deferred();
const ac = new AbortController();
const listeningPromise = deferred();
const server = Deno.serve({
handler: (req) => {
const { response, socket } = Deno.upgradeWebSocket(req);
socket.onopen = () => socket.send("Hello");
socket.onmessage = () => {
socket.send("Bye");
socket.close();
};
socket.onclose = () => ac.abort();
socket.onerror = () => fail();
return response;
},
signal: ac.signal,
onListen: () => listeningPromise.resolve(),
hostname: "localhost",
port: 4247,
});
await listeningPromise;
const ws = new WebSocket("ws://localhost:4247/");
assertEquals(ws.url, "ws://localhost:4247/");
let seenBye = false;
ws.onerror = () => fail();
ws.onmessage = ({ data }) => {
if (data == "Hello") {
ws.send("Hello!");
} else {
assertEquals(data, "Bye");
seenBye = true;
}
};
ws.onclose = () => {
promise.resolve();
};
await Promise.all([promise, server.finished]);
assert(seenBye);
});
Deno.test(

View file

@ -6,6 +6,7 @@ use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::url;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
@ -354,12 +355,19 @@ impl ServerWebSocket {
}
}
/// Reserve a lock, but don't wait on it. This gets us our place in line.
pub fn reserve_lock(self: &Rc<Self>) -> AsyncMutFuture<()> {
RcRef::map(self, |r| &r.tx_lock).borrow_mut()
}
#[inline]
pub async fn write_frame(
self: &Rc<Self>,
lock: AsyncMutFuture<()>,
frame: Frame,
) -> Result<(), AnyError> {
let _lock = RcRef::map(self, |r| &r.tx_lock).borrow_mut().await;
lock.await;
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
// to populate the write buffer. We encounter an await point when writing
// to the socket after the frame has already been written to the buffer.
@ -407,9 +415,10 @@ pub fn op_ws_send_binary(
let data = data.to_vec();
let len = data.len();
resource.buffered.set(resource.buffered.get() + len);
let lock = resource.reserve_lock();
deno_core::task::spawn(async move {
if let Err(err) = resource
.write_frame(Frame::new(true, OpCode::Binary, None, data))
.write_frame(lock, Frame::new(true, OpCode::Binary, None, data))
.await
{
resource.set_error(Some(err.to_string()));
@ -424,9 +433,13 @@ pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) {
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
let len = data.len();
resource.buffered.set(resource.buffered.get() + len);
let lock = resource.reserve_lock();
deno_core::task::spawn(async move {
if let Err(err) = resource
.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
.write_frame(
lock,
Frame::new(true, OpCode::Text, None, data.into_bytes()),
)
.await
{
resource.set_error(Some(err.to_string()));
@ -448,8 +461,9 @@ pub async fn op_ws_send_binary_async(
.resource_table
.get::<ServerWebSocket>(rid)?;
let data = data.to_vec();
let lock = resource.reserve_lock();
resource
.write_frame(Frame::new(true, OpCode::Binary, None, data))
.write_frame(lock, Frame::new(true, OpCode::Binary, None, data))
.await
}
@ -464,8 +478,12 @@ pub async fn op_ws_send_text_async(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
let lock = resource.reserve_lock();
resource
.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
.write_frame(
lock,
Frame::new(true, OpCode::Text, None, data.into_bytes()),
)
.await
}
@ -488,7 +506,8 @@ pub async fn op_ws_send_pong(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
resource.write_frame(Frame::pong(vec![])).await
let lock = resource.reserve_lock();
resource.write_frame(lock, Frame::pong(vec![])).await
}
#[op]
@ -500,8 +519,9 @@ pub async fn op_ws_send_ping(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
let lock = resource.reserve_lock();
resource
.write_frame(Frame::new(true, OpCode::Ping, None, vec![]))
.write_frame(lock, Frame::new(true, OpCode::Ping, None, vec![]))
.await
}
@ -521,7 +541,8 @@ pub async fn op_ws_close(
.unwrap_or_else(|| Frame::close_raw(vec![]));
resource.closed.set(true);
resource.write_frame(frame).await?;
let lock = resource.reserve_lock();
resource.write_frame(lock, frame).await?;
Ok(())
}