mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 16:42:21 -05:00
fix(ext/websockets): ensure we fully send frames before close (#19484)
Fixes #19483
This commit is contained in:
parent
cd61c01545
commit
d158a0bf99
2 changed files with 81 additions and 10 deletions
|
@ -100,7 +100,7 @@ Deno.test(
|
|||
promise.resolve();
|
||||
};
|
||||
|
||||
await Promise.all([promise, server]);
|
||||
await Promise.all([promise, server.finished]);
|
||||
ws.close();
|
||||
},
|
||||
);
|
||||
|
@ -145,7 +145,57 @@ Deno.test({
|
|||
ws.onclose = () => {
|
||||
promise.resolve();
|
||||
};
|
||||
await Promise.all([promise, server]);
|
||||
await Promise.all([promise, server.finished]);
|
||||
});
|
||||
|
||||
// https://github.com/denoland/deno/issues/19483
|
||||
Deno.test({
|
||||
sanitizeOps: false,
|
||||
sanitizeResources: false,
|
||||
}, async function websocketCloseFlushes() {
|
||||
const promise = deferred();
|
||||
|
||||
const ac = new AbortController();
|
||||
const listeningPromise = deferred();
|
||||
|
||||
const server = Deno.serve({
|
||||
handler: (req) => {
|
||||
const { response, socket } = Deno.upgradeWebSocket(req);
|
||||
socket.onopen = () => socket.send("Hello");
|
||||
socket.onmessage = () => {
|
||||
socket.send("Bye");
|
||||
socket.close();
|
||||
};
|
||||
socket.onclose = () => ac.abort();
|
||||
socket.onerror = () => fail();
|
||||
return response;
|
||||
},
|
||||
signal: ac.signal,
|
||||
onListen: () => listeningPromise.resolve(),
|
||||
hostname: "localhost",
|
||||
port: 4247,
|
||||
});
|
||||
|
||||
await listeningPromise;
|
||||
|
||||
const ws = new WebSocket("ws://localhost:4247/");
|
||||
assertEquals(ws.url, "ws://localhost:4247/");
|
||||
let seenBye = false;
|
||||
ws.onerror = () => fail();
|
||||
ws.onmessage = ({ data }) => {
|
||||
if (data == "Hello") {
|
||||
ws.send("Hello!");
|
||||
} else {
|
||||
assertEquals(data, "Bye");
|
||||
seenBye = true;
|
||||
}
|
||||
};
|
||||
ws.onclose = () => {
|
||||
promise.resolve();
|
||||
};
|
||||
await Promise.all([promise, server.finished]);
|
||||
|
||||
assert(seenBye);
|
||||
});
|
||||
|
||||
Deno.test(
|
||||
|
|
|
@ -6,6 +6,7 @@ use deno_core::error::type_error;
|
|||
use deno_core::error::AnyError;
|
||||
use deno_core::op;
|
||||
use deno_core::url;
|
||||
use deno_core::AsyncMutFuture;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::ByteString;
|
||||
use deno_core::CancelFuture;
|
||||
|
@ -354,12 +355,19 @@ impl ServerWebSocket {
|
|||
}
|
||||
}
|
||||
|
||||
/// Reserve a lock, but don't wait on it. This gets us our place in line.
|
||||
pub fn reserve_lock(self: &Rc<Self>) -> AsyncMutFuture<()> {
|
||||
RcRef::map(self, |r| &r.tx_lock).borrow_mut()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn write_frame(
|
||||
self: &Rc<Self>,
|
||||
lock: AsyncMutFuture<()>,
|
||||
frame: Frame,
|
||||
) -> Result<(), AnyError> {
|
||||
let _lock = RcRef::map(self, |r| &r.tx_lock).borrow_mut().await;
|
||||
lock.await;
|
||||
|
||||
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
|
||||
// to populate the write buffer. We encounter an await point when writing
|
||||
// to the socket after the frame has already been written to the buffer.
|
||||
|
@ -407,9 +415,10 @@ pub fn op_ws_send_binary(
|
|||
let data = data.to_vec();
|
||||
let len = data.len();
|
||||
resource.buffered.set(resource.buffered.get() + len);
|
||||
let lock = resource.reserve_lock();
|
||||
deno_core::task::spawn(async move {
|
||||
if let Err(err) = resource
|
||||
.write_frame(Frame::new(true, OpCode::Binary, None, data))
|
||||
.write_frame(lock, Frame::new(true, OpCode::Binary, None, data))
|
||||
.await
|
||||
{
|
||||
resource.set_error(Some(err.to_string()));
|
||||
|
@ -424,9 +433,13 @@ pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) {
|
|||
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
|
||||
let len = data.len();
|
||||
resource.buffered.set(resource.buffered.get() + len);
|
||||
let lock = resource.reserve_lock();
|
||||
deno_core::task::spawn(async move {
|
||||
if let Err(err) = resource
|
||||
.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
|
||||
.write_frame(
|
||||
lock,
|
||||
Frame::new(true, OpCode::Text, None, data.into_bytes()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
resource.set_error(Some(err.to_string()));
|
||||
|
@ -448,8 +461,9 @@ pub async fn op_ws_send_binary_async(
|
|||
.resource_table
|
||||
.get::<ServerWebSocket>(rid)?;
|
||||
let data = data.to_vec();
|
||||
let lock = resource.reserve_lock();
|
||||
resource
|
||||
.write_frame(Frame::new(true, OpCode::Binary, None, data))
|
||||
.write_frame(lock, Frame::new(true, OpCode::Binary, None, data))
|
||||
.await
|
||||
}
|
||||
|
||||
|
@ -464,8 +478,12 @@ pub async fn op_ws_send_text_async(
|
|||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<ServerWebSocket>(rid)?;
|
||||
let lock = resource.reserve_lock();
|
||||
resource
|
||||
.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
|
||||
.write_frame(
|
||||
lock,
|
||||
Frame::new(true, OpCode::Text, None, data.into_bytes()),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
|
@ -488,7 +506,8 @@ pub async fn op_ws_send_pong(
|
|||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<ServerWebSocket>(rid)?;
|
||||
resource.write_frame(Frame::pong(vec![])).await
|
||||
let lock = resource.reserve_lock();
|
||||
resource.write_frame(lock, Frame::pong(vec![])).await
|
||||
}
|
||||
|
||||
#[op]
|
||||
|
@ -500,8 +519,9 @@ pub async fn op_ws_send_ping(
|
|||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<ServerWebSocket>(rid)?;
|
||||
let lock = resource.reserve_lock();
|
||||
resource
|
||||
.write_frame(Frame::new(true, OpCode::Ping, None, vec![]))
|
||||
.write_frame(lock, Frame::new(true, OpCode::Ping, None, vec![]))
|
||||
.await
|
||||
}
|
||||
|
||||
|
@ -521,7 +541,8 @@ pub async fn op_ws_close(
|
|||
.unwrap_or_else(|| Frame::close_raw(vec![]));
|
||||
|
||||
resource.closed.set(true);
|
||||
resource.write_frame(frame).await?;
|
||||
let lock = resource.reserve_lock();
|
||||
resource.write_frame(lock, frame).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue