mirror of
https://github.com/denoland/deno.git
synced 2024-11-25 15:29:32 -05:00
feat(ext/websocket): split websocket read/write halves (#20579)
Fixes some UB when sending and receiving at the same time.
This commit is contained in:
parent
09204107d8
commit
b75f3b5ca0
6 changed files with 103 additions and 26 deletions
4
Cargo.lock
generated
4
Cargo.lock
generated
|
@ -2259,9 +2259,9 @@ checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
|
|||
|
||||
[[package]]
|
||||
name = "fastwebsockets"
|
||||
version = "0.4.4"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e6185b6dc9dddc4db0dedd2e213047e93bcbf7a0fb092abc4c4e4f3195efdb4"
|
||||
checksum = "17c35f166afb94b7f8e9449d0ad866daca111ba4053f3b1960bb480ca4382c63"
|
||||
dependencies = [
|
||||
"base64 0.21.4",
|
||||
"hyper 0.14.27",
|
||||
|
|
|
@ -87,7 +87,7 @@ data-encoding = "2.3.3"
|
|||
dlopen = "0.1.8"
|
||||
encoding_rs = "=0.8.33"
|
||||
ecb = "=0.1.2"
|
||||
fastwebsockets = "=0.4.4"
|
||||
fastwebsockets = "=0.5.0"
|
||||
filetime = "0.2.16"
|
||||
flate2 = { version = "1.0.26", features = ["zlib-ng"], default-features = false }
|
||||
fs3 = "0.5.0"
|
||||
|
|
|
@ -112,12 +112,22 @@ util::unit_test_factory!(
|
|||
fn js_unit_test(test: String) {
|
||||
let _g = util::http_server();
|
||||
|
||||
let mut deno = util::deno_cmd()
|
||||
let mut deno = util::deno_cmd();
|
||||
let deno = deno
|
||||
.current_dir(util::root_path())
|
||||
.arg("test")
|
||||
.arg("--unstable")
|
||||
.arg("--location=http://js-unit-tests/foo/bar")
|
||||
.arg("--no-prompt")
|
||||
.arg("--no-prompt");
|
||||
|
||||
// TODO(mmastrac): it would be better to just load a test CA for all tests
|
||||
let deno = if test == "websocket_test" {
|
||||
deno.arg("--unsafely-ignore-certificate-errors")
|
||||
} else {
|
||||
deno
|
||||
};
|
||||
|
||||
let mut deno = deno
|
||||
.arg("-A")
|
||||
.arg(util::tests_path().join("unit").join(format!("{test}.ts")))
|
||||
.stderr(Stdio::piped())
|
||||
|
|
|
@ -21,7 +21,7 @@ Deno.test(async function websocketConstructorTakeURLObjectAsParameter() {
|
|||
const promise = deferred();
|
||||
const ws = new WebSocket(new URL("ws://localhost:4242/"));
|
||||
assertEquals(ws.url, "ws://localhost:4242/");
|
||||
ws.onerror = () => fail();
|
||||
ws.onerror = (e) => promise.reject(e);
|
||||
ws.onopen = () => ws.close();
|
||||
ws.onclose = () => {
|
||||
promise.resolve();
|
||||
|
@ -29,13 +29,66 @@ Deno.test(async function websocketConstructorTakeURLObjectAsParameter() {
|
|||
await promise;
|
||||
});
|
||||
|
||||
Deno.test(async function websocketSendLargePacket() {
|
||||
const promise = deferred();
|
||||
const ws = new WebSocket(new URL("wss://localhost:4243/"));
|
||||
assertEquals(ws.url, "wss://localhost:4243/");
|
||||
ws.onerror = (e) => promise.reject(e);
|
||||
ws.onopen = () => {
|
||||
ws.send("a".repeat(65000));
|
||||
};
|
||||
ws.onmessage = () => {
|
||||
ws.close();
|
||||
};
|
||||
ws.onclose = () => {
|
||||
promise.resolve();
|
||||
};
|
||||
await promise;
|
||||
});
|
||||
|
||||
Deno.test(async function websocketSendLargeBinaryPacket() {
|
||||
const promise = deferred();
|
||||
const ws = new WebSocket(new URL("wss://localhost:4243/"));
|
||||
assertEquals(ws.url, "wss://localhost:4243/");
|
||||
ws.onerror = (e) => promise.reject(e);
|
||||
ws.onopen = () => {
|
||||
ws.send(new Uint8Array(65000));
|
||||
};
|
||||
ws.onmessage = (msg) => {
|
||||
console.log(msg);
|
||||
ws.close();
|
||||
};
|
||||
ws.onclose = () => {
|
||||
promise.resolve();
|
||||
};
|
||||
await promise;
|
||||
});
|
||||
|
||||
Deno.test(async function websocketSendLargeBlobPacket() {
|
||||
const promise = deferred();
|
||||
const ws = new WebSocket(new URL("wss://localhost:4243/"));
|
||||
assertEquals(ws.url, "wss://localhost:4243/");
|
||||
ws.onerror = (e) => promise.reject(e);
|
||||
ws.onopen = () => {
|
||||
ws.send(new Blob(["a".repeat(65000)]));
|
||||
};
|
||||
ws.onmessage = (msg) => {
|
||||
console.log(msg);
|
||||
ws.close();
|
||||
};
|
||||
ws.onclose = () => {
|
||||
promise.resolve();
|
||||
};
|
||||
await promise;
|
||||
});
|
||||
|
||||
// https://github.com/denoland/deno/pull/17762
|
||||
// https://github.com/denoland/deno/issues/17761
|
||||
Deno.test(async function websocketPingPong() {
|
||||
const promise = deferred();
|
||||
const ws = new WebSocket("ws://localhost:4245/");
|
||||
assertEquals(ws.url, "ws://localhost:4245/");
|
||||
ws.onerror = () => fail();
|
||||
ws.onerror = (e) => promise.reject(e);
|
||||
ws.onmessage = (e) => {
|
||||
ws.send(e.data);
|
||||
};
|
||||
|
@ -144,7 +197,9 @@ Deno.test({
|
|||
const ws = new WebSocket(serveUrl);
|
||||
assertEquals(ws.url, serveUrl);
|
||||
ws.onerror = () => fail();
|
||||
ws.onmessage = () => ws.send("bye");
|
||||
ws.onmessage = (m: MessageEvent) => {
|
||||
if (m.data == "Hello") ws.send("bye");
|
||||
};
|
||||
ws.onclose = () => {
|
||||
promise.resolve();
|
||||
};
|
||||
|
|
|
@ -18,7 +18,7 @@ bytes.workspace = true
|
|||
deno_core.workspace = true
|
||||
deno_net.workspace = true
|
||||
deno_tls.workspace = true
|
||||
fastwebsockets = { workspace = true, features = ["upgrade"] }
|
||||
fastwebsockets = { workspace = true, features = ["upgrade", "unstable-split"] }
|
||||
http.workspace = true
|
||||
hyper = { workspace = true, features = ["backports"] }
|
||||
once_cell.workspace = true
|
||||
|
|
|
@ -41,17 +41,21 @@ use std::rc::Rc;
|
|||
use std::sync::Arc;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::ReadHalf;
|
||||
use tokio::io::WriteHalf;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_rustls::rustls::RootCertStore;
|
||||
use tokio_rustls::rustls::ServerName;
|
||||
use tokio_rustls::TlsConnector;
|
||||
|
||||
use fastwebsockets::CloseCode;
|
||||
use fastwebsockets::FragmentCollector;
|
||||
use fastwebsockets::FragmentCollectorRead;
|
||||
use fastwebsockets::Frame;
|
||||
use fastwebsockets::OpCode;
|
||||
use fastwebsockets::Role;
|
||||
use fastwebsockets::WebSocket;
|
||||
use fastwebsockets::WebSocketWrite;
|
||||
|
||||
mod stream;
|
||||
|
||||
static USE_WRITEV: Lazy<bool> = Lazy::new(|| {
|
||||
|
@ -332,12 +336,13 @@ pub struct ServerWebSocket {
|
|||
closed: Cell<bool>,
|
||||
buffer: Cell<Option<Vec<u8>>>,
|
||||
string: Cell<Option<String>>,
|
||||
ws: AsyncRefCell<FragmentCollector<WebSocketStream>>,
|
||||
tx_lock: AsyncRefCell<()>,
|
||||
ws_read: AsyncRefCell<FragmentCollectorRead<ReadHalf<WebSocketStream>>>,
|
||||
ws_write: AsyncRefCell<WebSocketWrite<WriteHalf<WebSocketStream>>>,
|
||||
}
|
||||
|
||||
impl ServerWebSocket {
|
||||
fn new(ws: WebSocket<WebSocketStream>) -> Self {
|
||||
let (ws_read, ws_write) = ws.split(tokio::io::split);
|
||||
Self {
|
||||
buffered: Cell::new(0),
|
||||
error: Cell::new(None),
|
||||
|
@ -345,8 +350,8 @@ impl ServerWebSocket {
|
|||
closed: Cell::new(false),
|
||||
buffer: Cell::new(None),
|
||||
string: Cell::new(None),
|
||||
ws: AsyncRefCell::new(FragmentCollector::new(ws)),
|
||||
tx_lock: AsyncRefCell::new(()),
|
||||
ws_read: AsyncRefCell::new(FragmentCollectorRead::new(ws_read)),
|
||||
ws_write: AsyncRefCell::new(ws_write),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -361,22 +366,22 @@ impl ServerWebSocket {
|
|||
}
|
||||
|
||||
/// Reserve a lock, but don't wait on it. This gets us our place in line.
|
||||
pub fn reserve_lock(self: &Rc<Self>) -> AsyncMutFuture<()> {
|
||||
RcRef::map(self, |r| &r.tx_lock).borrow_mut()
|
||||
fn reserve_lock(
|
||||
self: &Rc<Self>,
|
||||
) -> AsyncMutFuture<WebSocketWrite<WriteHalf<WebSocketStream>>> {
|
||||
RcRef::map(self, |r| &r.ws_write).borrow_mut()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn write_frame(
|
||||
async fn write_frame(
|
||||
self: &Rc<Self>,
|
||||
lock: AsyncMutFuture<()>,
|
||||
lock: AsyncMutFuture<WebSocketWrite<WriteHalf<WebSocketStream>>>,
|
||||
frame: Frame<'_>,
|
||||
) -> Result<(), AnyError> {
|
||||
lock.await;
|
||||
|
||||
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
|
||||
// to populate the write buffer. We encounter an await point when writing
|
||||
// to the socket after the frame has already been written to the buffer.
|
||||
let ws = unsafe { &mut *self.ws.as_ptr() };
|
||||
let mut ws = lock.await;
|
||||
if ws.is_closed() {
|
||||
return Ok(());
|
||||
}
|
||||
ws.write_frame(frame)
|
||||
.await
|
||||
.map_err(|err| type_error(err.to_string()))?;
|
||||
|
@ -405,6 +410,7 @@ pub fn ws_create_server_stream(
|
|||
ws.set_writev(*USE_WRITEV);
|
||||
ws.set_auto_close(true);
|
||||
ws.set_auto_pong(true);
|
||||
|
||||
let rid = state.resource_table.add(ServerWebSocket::new(ws));
|
||||
Ok(rid)
|
||||
}
|
||||
|
@ -627,9 +633,15 @@ pub async fn op_ws_next_event(
|
|||
return MessageKind::Error as u16;
|
||||
}
|
||||
|
||||
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
||||
let mut ws = RcRef::map(&resource, |r| &r.ws_read).borrow_mut().await;
|
||||
let writer = RcRef::map(&resource, |r| &r.ws_write);
|
||||
let mut sender = move |frame| {
|
||||
let writer = writer.clone();
|
||||
async move { writer.borrow_mut().await.write_frame(frame).await }
|
||||
};
|
||||
loop {
|
||||
let val = match ws.read_frame().await {
|
||||
let res = ws.read_frame(&mut sender).await;
|
||||
let val = match res {
|
||||
Ok(val) => val,
|
||||
Err(err) => {
|
||||
// No message was received, socket closed while we waited.
|
||||
|
|
Loading…
Reference in a new issue