1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-24 08:09:08 -05:00

perf(ext/websocket): Reduce GC pressure & monomorpize op_ws_next_event (#19405)

Reduce the GC pressure from the websocket event method by splitting it
into an event getter and a buffer getter.

Before:
165.9k msg/sec

After:
169.9k msg/sec
This commit is contained in:
Matt Mastracci 2023-06-08 09:32:08 -06:00 committed by GitHub
parent f35161d3c5
commit 976c381045
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 78 deletions

View file

@ -150,7 +150,7 @@ Deno.test({
Deno.test( Deno.test(
{ sanitizeOps: false }, { sanitizeOps: false },
function websocketConstructorWithPrototypePollusion() { function websocketConstructorWithPrototypePollution() {
const originalSymbolIterator = Array.prototype[Symbol.iterator]; const originalSymbolIterator = Array.prototype[Symbol.iterator];
try { try {
Array.prototype[Symbol.iterator] = () => { Array.prototype[Symbol.iterator] = () => {

View file

@ -53,6 +53,9 @@ const {
op_ws_send_binary, op_ws_send_binary,
op_ws_send_text, op_ws_send_text,
op_ws_next_event, op_ws_next_event,
op_ws_get_buffer,
op_ws_get_buffer_as_string,
op_ws_get_error,
op_ws_send_ping, op_ws_send_ping,
op_ws_get_buffered_amount, op_ws_get_buffered_amount,
} = core.ensureFastOps(); } = core.ensureFastOps();
@ -407,15 +410,16 @@ class WebSocket extends EventTarget {
} }
async [_eventLoop]() { async [_eventLoop]() {
const rid = this[_rid];
while (this[_readyState] !== CLOSED) { while (this[_readyState] !== CLOSED) {
const { 0: kind, 1: value } = await op_ws_next_event(this[_rid]); const kind = await op_ws_next_event(rid);
switch (kind) { switch (kind) {
case 0: { case 0: {
/* string */ /* string */
this[_serverHandleIdleTimeout](); this[_serverHandleIdleTimeout]();
const event = new MessageEvent("message", { const event = new MessageEvent("message", {
data: value, data: op_ws_get_buffer_as_string(rid),
origin: this[_url], origin: this[_url],
}); });
dispatch(this, event); dispatch(this, event);
@ -424,12 +428,13 @@ class WebSocket extends EventTarget {
case 1: { case 1: {
/* binary */ /* binary */
this[_serverHandleIdleTimeout](); this[_serverHandleIdleTimeout]();
// deno-lint-ignore prefer-primordials
const buffer = op_ws_get_buffer(rid).buffer;
let data; let data;
if (this.binaryType === "blob") { if (this.binaryType === "blob") {
data = new Blob([value]); data = new Blob([buffer]);
} else { } else {
data = value; data = buffer;
} }
const event = new MessageEvent("message", { const event = new MessageEvent("message", {
@ -450,13 +455,13 @@ class WebSocket extends EventTarget {
this[_readyState] = CLOSED; this[_readyState] = CLOSED;
const errorEv = new ErrorEvent("error", { const errorEv = new ErrorEvent("error", {
message: value, message: op_ws_get_error(rid),
}); });
this.dispatchEvent(errorEv); this.dispatchEvent(errorEv);
const closeEv = new CloseEvent("close"); const closeEv = new CloseEvent("close");
this.dispatchEvent(closeEv); this.dispatchEvent(closeEv);
core.tryClose(this[_rid]); core.tryClose(rid);
break; break;
} }
default: { default: {
@ -469,9 +474,9 @@ class WebSocket extends EventTarget {
if (prevState === OPEN) { if (prevState === OPEN) {
try { try {
await op_ws_close( await op_ws_close(
this[_rid], rid,
code, code,
value, op_ws_get_error(rid),
); );
} catch { } catch {
// ignore failures // ignore failures
@ -481,10 +486,10 @@ class WebSocket extends EventTarget {
const event = new CloseEvent("close", { const event = new CloseEvent("close", {
wasClean: true, wasClean: true,
code: code, code: code,
reason: value, reason: op_ws_get_error(rid),
}); });
this.dispatchEvent(event); this.dispatchEvent(event);
core.tryClose(this[_rid]); core.tryClose(rid);
break; break;
} }
} }

View file

@ -37,6 +37,9 @@ const {
op_ws_send_text_async, op_ws_send_text_async,
op_ws_send_binary_async, op_ws_send_binary_async,
op_ws_next_event, op_ws_next_event,
op_ws_get_buffer,
op_ws_get_buffer_as_string,
op_ws_get_error,
op_ws_create, op_ws_create,
op_ws_close, op_ws_close,
} = core.ensureFastOps(); } = core.ensureFastOps();
@ -177,7 +180,7 @@ class WebSocketStream {
PromisePrototypeThen( PromisePrototypeThen(
(async () => { (async () => {
while (true) { while (true) {
const { 0: kind } = await op_ws_next_event(create.rid); const kind = await op_ws_next_event(create.rid);
if (kind > 5) { if (kind > 5) {
/* close */ /* close */
@ -239,14 +242,16 @@ class WebSocketStream {
}, },
}); });
const pull = async (controller) => { const pull = async (controller) => {
const { 0: kind, 1: value } = await op_ws_next_event(this[_rid]); const kind = await op_ws_next_event(this[_rid]);
switch (kind) { switch (kind) {
case 0: case 0:
case 1: {
/* string */ /* string */
controller.enqueue(op_ws_get_buffer_as_string(this[_rid]));
break;
case 1: {
/* binary */ /* binary */
controller.enqueue(value); controller.enqueue(op_ws_get_buffer(this[_rid]));
break; break;
} }
case 2: { case 2: {
@ -255,7 +260,7 @@ class WebSocketStream {
} }
case 3: { case 3: {
/* error */ /* error */
const err = new Error(value); const err = new Error(op_ws_get_error(this[_rid]));
this[_closed].reject(err); this[_closed].reject(err);
controller.error(err); controller.error(err);
core.tryClose(this[_rid]); core.tryClose(this[_rid]);
@ -271,7 +276,7 @@ class WebSocketStream {
/* close */ /* close */
this[_closed].resolve({ this[_closed].resolve({
code: kind, code: kind,
reason: value, reason: op_ws_get_error(this[_rid]),
}); });
core.tryClose(this[_rid]); core.tryClose(this[_rid]);
break; break;
@ -289,7 +294,7 @@ class WebSocketStream {
return pull(controller); return pull(controller);
} }
this[_closed].resolve(value); this[_closed].resolve(op_ws_get_error(this[_rid]));
core.tryClose(this[_rid]); core.tryClose(this[_rid]);
} }
}; };

View file

@ -14,7 +14,6 @@ use deno_core::OpState;
use deno_core::RcRef; use deno_core::RcRef;
use deno_core::Resource; use deno_core::Resource;
use deno_core::ResourceId; use deno_core::ResourceId;
use deno_core::StringOrBuffer;
use deno_core::ZeroCopyBuf; use deno_core::ZeroCopyBuf;
use deno_net::raw::NetworkStream; use deno_net::raw::NetworkStream;
use deno_tls::create_client_config; use deno_tls::create_client_config;
@ -290,15 +289,8 @@ where
state.borrow_mut().resource_table.close(cancel_rid).ok(); state.borrow_mut().resource_table.close(cancel_rid).ok();
} }
let resource = ServerWebSocket {
buffered: Cell::new(0),
errored: Cell::new(None),
ws: AsyncRefCell::new(FragmentCollector::new(stream)),
closed: Cell::new(false),
tx_lock: AsyncRefCell::new(()),
};
let mut state = state.borrow_mut(); let mut state = state.borrow_mut();
let rid = state.resource_table.add(resource); let rid = state.resource_table.add(ServerWebSocket::new(stream));
let protocol = match response.headers().get("Sec-WebSocket-Protocol") { let protocol = match response.headers().get("Sec-WebSocket-Protocol") {
Some(header) => header.to_str().unwrap(), Some(header) => header.to_str().unwrap(),
@ -323,18 +315,43 @@ pub enum MessageKind {
Binary = 1, Binary = 1,
Pong = 2, Pong = 2,
Error = 3, Error = 3,
Closed = 4, ClosedDefault = 1005,
} }
/// To avoid locks, we keep as much as we can inside of [`Cell`]s.
pub struct ServerWebSocket { pub struct ServerWebSocket {
buffered: Cell<usize>, buffered: Cell<usize>,
errored: Cell<Option<AnyError>>, error: Cell<Option<String>>,
ws: AsyncRefCell<FragmentCollector<WebSocketStream>>, errored: Cell<bool>,
closed: Cell<bool>, closed: Cell<bool>,
buffer: Cell<Option<Vec<u8>>>,
ws: AsyncRefCell<FragmentCollector<WebSocketStream>>,
tx_lock: AsyncRefCell<()>, tx_lock: AsyncRefCell<()>,
} }
impl ServerWebSocket { impl ServerWebSocket {
fn new(ws: WebSocket<WebSocketStream>) -> Self {
Self {
buffered: Cell::new(0),
error: Cell::new(None),
errored: Cell::new(false),
closed: Cell::new(false),
buffer: Cell::new(None),
ws: AsyncRefCell::new(FragmentCollector::new(ws)),
tx_lock: AsyncRefCell::new(()),
}
}
fn set_error(&self, error: Option<String>) {
if let Some(error) = error {
self.error.set(Some(error));
self.errored.set(true);
} else {
self.error.set(None);
self.errored.set(false);
}
}
#[inline] #[inline]
pub async fn write_frame( pub async fn write_frame(
self: &Rc<Self>, self: &Rc<Self>,
@ -374,15 +391,7 @@ pub fn ws_create_server_stream(
ws.set_auto_close(true); ws.set_auto_close(true);
ws.set_auto_pong(true); ws.set_auto_pong(true);
let ws_resource = ServerWebSocket { let rid = state.resource_table.add(ServerWebSocket::new(ws));
buffered: Cell::new(0),
errored: Cell::new(None),
ws: AsyncRefCell::new(FragmentCollector::new(ws)),
closed: Cell::new(false),
tx_lock: AsyncRefCell::new(()),
};
let rid = state.resource_table.add(ws_resource);
Ok(rid) Ok(rid)
} }
@ -401,7 +410,7 @@ pub fn op_ws_send_binary(
.write_frame(Frame::new(true, OpCode::Binary, None, data)) .write_frame(Frame::new(true, OpCode::Binary, None, data))
.await .await
{ {
resource.errored.set(Some(err)); resource.set_error(Some(err.to_string()));
} else { } else {
resource.buffered.set(resource.buffered.get() - len); resource.buffered.set(resource.buffered.get() - len);
} }
@ -418,7 +427,7 @@ pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) {
.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
.await .await
{ {
resource.errored.set(Some(err)); resource.set_error(Some(err.to_string()));
} else { } else {
resource.buffered.set(resource.buffered.get() - len); resource.buffered.set(resource.buffered.get() - len);
} }
@ -514,18 +523,47 @@ pub async fn op_ws_close(
Ok(()) Ok(())
} }
#[op]
pub fn op_ws_get_buffer(state: &mut OpState, rid: ResourceId) -> ZeroCopyBuf {
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
resource.buffer.take().unwrap().into()
}
#[op]
pub fn op_ws_get_buffer_as_string(
state: &mut OpState,
rid: ResourceId,
) -> String {
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
// TODO(mmastrac): We won't panic on a bad string, but we return an empty one.
String::from_utf8(resource.buffer.take().unwrap()).unwrap_or_default()
}
#[op]
pub fn op_ws_get_error(state: &mut OpState, rid: ResourceId) -> String {
let Ok(resource) = state.resource_table.get::<ServerWebSocket>(rid) else {
return "Bad resource".into();
};
resource.errored.set(false);
resource.error.take().unwrap_or_default()
}
#[op(fast)] #[op(fast)]
pub async fn op_ws_next_event( pub async fn op_ws_next_event(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
rid: ResourceId, rid: ResourceId,
) -> Result<(u16, StringOrBuffer), AnyError> { ) -> u16 {
let resource = state let Ok(resource) = state
.borrow_mut() .borrow_mut()
.resource_table .resource_table
.get::<ServerWebSocket>(rid)?; .get::<ServerWebSocket>(rid) else {
// op_ws_get_error will correctly handle a bad resource
return MessageKind::Error as u16;
};
if let Some(err) = resource.errored.take() { // If there's a pending error, this always returns error
return Err(err); if resource.errored.get() {
return MessageKind::Error as u16;
} }
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
@ -537,46 +575,44 @@ pub async fn op_ws_next_event(
// Try close the stream, ignoring any errors, and report closed status to JavaScript. // Try close the stream, ignoring any errors, and report closed status to JavaScript.
if resource.closed.get() { if resource.closed.get() {
let _ = state.borrow_mut().resource_table.close(rid); let _ = state.borrow_mut().resource_table.close(rid);
return Ok(( resource.set_error(None);
MessageKind::Closed as u16, return MessageKind::ClosedDefault as u16;
StringOrBuffer::Buffer(vec![].into()),
));
} }
return Ok(( resource.set_error(Some(err.to_string()));
MessageKind::Error as u16, return MessageKind::Error as u16;
StringOrBuffer::String(err.to_string()),
));
} }
}; };
break Ok(match val.opcode { break match val.opcode {
OpCode::Text => ( OpCode::Text => {
MessageKind::Text as u16, resource.buffer.set(Some(val.payload));
StringOrBuffer::String(String::from_utf8(val.payload).unwrap()), MessageKind::Text as u16
),
OpCode::Binary => (
MessageKind::Binary as u16,
StringOrBuffer::Buffer(val.payload.into()),
),
OpCode::Close => {
if val.payload.len() < 2 {
return Ok((1005, StringOrBuffer::String("".to_string())));
}
let close_code =
CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]]));
let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap();
(close_code.into(), StringOrBuffer::String(reason))
} }
OpCode::Pong => ( OpCode::Binary => {
MessageKind::Pong as u16, resource.buffer.set(Some(val.payload));
StringOrBuffer::Buffer(vec![].into()), MessageKind::Binary as u16
), }
OpCode::Close => {
// Close reason is returned through error
if val.payload.len() < 2 {
resource.set_error(None);
MessageKind::ClosedDefault as u16
} else {
let close_code = CloseCode::from(u16::from_be_bytes([
val.payload[0],
val.payload[1],
]));
let reason = String::from_utf8(val.payload[2..].to_vec()).ok();
resource.set_error(reason);
close_code.into()
}
}
OpCode::Pong => MessageKind::Pong as u16,
OpCode::Continuation | OpCode::Ping => { OpCode::Continuation | OpCode::Ping => {
continue; continue;
} }
}); };
} }
} }
@ -588,6 +624,9 @@ deno_core::extension!(deno_websocket,
op_ws_create<P>, op_ws_create<P>,
op_ws_close, op_ws_close,
op_ws_next_event, op_ws_next_event,
op_ws_get_buffer,
op_ws_get_buffer_as_string,
op_ws_get_error,
op_ws_send_binary, op_ws_send_binary,
op_ws_send_text, op_ws_send_text,
op_ws_send_binary_async, op_ws_send_binary_async,