1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-24 15:19:26 -05:00

perf(ext/websocket): efficient event kind serialization (#18509)

Use u16 to represent the kind of event (0 - 6) & event code > 6 is
treated as the close code. This way we can represent all events + the
close code in a single JS number. This is safe because (as per RFC 6455)
close code from 0-999 are reserved & not used.

| name | avg msg/sec/core |
| --- | --- |
| deno_main | `127820.750000` |
| deno #18506 | `140079.000000` |
| deno #18506 + this | `150104.250000` |
This commit is contained in:
Divy Srivastava 2023-03-31 10:34:12 +05:30 committed by Matt Mastracci
parent 5d20c36eaf
commit 0cd0a9d5ed
3 changed files with 112 additions and 73 deletions

View file

@ -394,13 +394,14 @@ class WebSocket extends EventTarget {
async [_eventLoop]() { async [_eventLoop]() {
while (this[_readyState] !== CLOSED) { while (this[_readyState] !== CLOSED) {
const { kind, value } = await core.opAsync( const { 0: kind, 1: value } = await core.opAsync(
"op_ws_next_event", "op_ws_next_event",
this[_rid], this[_rid],
); );
switch (kind) { switch (kind) {
case "string": { case 0: {
/* string */
this[_serverHandleIdleTimeout](); this[_serverHandleIdleTimeout]();
const event = new MessageEvent("message", { const event = new MessageEvent("message", {
data: value, data: value,
@ -409,14 +410,15 @@ class WebSocket extends EventTarget {
this.dispatchEvent(event); this.dispatchEvent(event);
break; break;
} }
case "binary": { case 1: {
/* binary */
this[_serverHandleIdleTimeout](); this[_serverHandleIdleTimeout]();
let data; let data;
if (this.binaryType === "blob") { if (this.binaryType === "blob") {
data = new Blob([value]); data = new Blob([value]);
} else { } else {
data = value.buffer; data = value;
} }
const event = new MessageEvent("message", { const event = new MessageEvent("message", {
@ -427,39 +429,13 @@ class WebSocket extends EventTarget {
this.dispatchEvent(event); this.dispatchEvent(event);
break; break;
} }
case "pong": { case 2: {
/* pong */
this[_serverHandleIdleTimeout](); this[_serverHandleIdleTimeout]();
break; break;
} }
case "closed": case 5: {
case "close": { /* error */
const prevState = this[_readyState];
this[_readyState] = CLOSED;
clearTimeout(this[_idleTimeoutTimeout]);
if (prevState === OPEN) {
try {
await core.opAsync(
"op_ws_close",
this[_rid],
value.code,
value.reason,
);
} catch {
// ignore failures
}
}
const event = new CloseEvent("close", {
wasClean: true,
code: value.code,
reason: value.reason,
});
this.dispatchEvent(event);
core.tryClose(this[_rid]);
break;
}
case "error": {
this[_readyState] = CLOSED; this[_readyState] = CLOSED;
const errorEv = new ErrorEvent("error", { const errorEv = new ErrorEvent("error", {
@ -472,6 +448,39 @@ class WebSocket extends EventTarget {
core.tryClose(this[_rid]); core.tryClose(this[_rid]);
break; break;
} }
case 3: {
/* ping */
break;
}
default: {
/* close */
const code = kind;
const prevState = this[_readyState];
this[_readyState] = CLOSED;
clearTimeout(this[_idleTimeoutTimeout]);
if (prevState === OPEN) {
try {
await core.opAsync(
"op_ws_close",
this[_rid],
code,
value,
);
} catch {
// ignore failures
}
}
const event = new CloseEvent("close", {
wasClean: true,
code: code,
reason: value,
});
this.dispatchEvent(event);
core.tryClose(this[_rid]);
break;
}
} }
} }
} }

View file

@ -167,12 +167,13 @@ class WebSocketStream {
PromisePrototypeThen( PromisePrototypeThen(
(async () => { (async () => {
while (true) { while (true) {
const { kind } = await core.opAsync( const { 0: kind } = await core.opAsync(
"op_ws_next_event", "op_ws_next_event",
create.rid, create.rid,
); );
if (kind === "close") { if (kind > 6) {
/* close */
break; break;
} }
} }
@ -237,37 +238,51 @@ class WebSocketStream {
}, },
}); });
const pull = async (controller) => { const pull = async (controller) => {
const { kind, value } = await core.opAsync( const { 0: kind, 1: value } = await core.opAsync(
"op_ws_next_event", "op_ws_next_event",
this[_rid], this[_rid],
); );
switch (kind) { switch (kind) {
case "string": { case 0:
case 1: {
/* string */
/* binary */
controller.enqueue(value); controller.enqueue(value);
break; break;
} }
case "binary": { case 5: {
controller.enqueue(value); /* error */
const err = new Error(value);
this[_closed].reject(err);
controller.error(err);
core.tryClose(this[_rid]);
break; break;
} }
case "ping": { case 3: {
/* ping */
await core.opAsync("op_ws_send", this[_rid], { await core.opAsync("op_ws_send", this[_rid], {
kind: "pong", kind: "pong",
}); });
await pull(controller); await pull(controller);
break; break;
} }
case "closed": case 2: {
case "close": { /* pong */
this[_closed].resolve(value); break;
}
case 6: {
/* closed */
this[_closed].resolve(undefined);
core.tryClose(this[_rid]); core.tryClose(this[_rid]);
break; break;
} }
case "error": { default: {
const err = new Error(value); /* close */
this[_closed].reject(err); this[_closed].resolve({
controller.error(err); code: kind,
reason: value,
});
core.tryClose(this[_rid]); core.tryClose(this[_rid]);
break; break;
} }

View file

@ -8,6 +8,7 @@ use deno_core::futures::stream::SplitStream;
use deno_core::futures::SinkExt; use deno_core::futures::SinkExt;
use deno_core::futures::StreamExt; use deno_core::futures::StreamExt;
use deno_core::op; use deno_core::op;
use deno_core::StringOrBuffer;
use deno_core::url; use deno_core::url;
use deno_core::AsyncRefCell; use deno_core::AsyncRefCell;
@ -475,23 +476,21 @@ pub async fn op_ws_close(
Ok(()) Ok(())
} }
#[derive(Serialize)] #[repr(u16)]
#[serde(tag = "kind", content = "value", rename_all = "camelCase")] pub enum MessageKind {
pub enum NextEventResponse { Text = 0,
String(String), Binary = 1,
Binary(ZeroCopyBuf), Pong = 2,
Close { code: u16, reason: String }, Ping = 3,
Ping, Error = 5,
Pong, Closed = 6,
Error(String),
Closed,
} }
#[op] #[op]
pub async fn op_ws_next_event( pub async fn op_ws_next_event(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
rid: ResourceId, rid: ResourceId,
) -> Result<NextEventResponse, AnyError> { ) -> Result<(u16, StringOrBuffer), AnyError> {
let resource = state let resource = state
.borrow_mut() .borrow_mut()
.resource_table .resource_table
@ -500,24 +499,40 @@ pub async fn op_ws_next_event(
let cancel = RcRef::map(&resource, |r| &r.cancel); let cancel = RcRef::map(&resource, |r| &r.cancel);
let val = resource.next_message(cancel).await?; let val = resource.next_message(cancel).await?;
let res = match val { let res = match val {
Some(Ok(Message::Text(text))) => NextEventResponse::String(text), Some(Ok(Message::Text(text))) => {
Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()), (MessageKind::Text as u16, StringOrBuffer::String(text))
Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close { }
code: frame.code.into(), Some(Ok(Message::Binary(data))) => (
reason: frame.reason.to_string(), MessageKind::Binary as u16,
}, StringOrBuffer::Buffer(data.into()),
Some(Ok(Message::Close(None))) => NextEventResponse::Close { ),
code: 1005, Some(Ok(Message::Close(Some(frame)))) => (
reason: String::new(), frame.code.into(),
}, StringOrBuffer::String(frame.reason.to_string()),
Some(Ok(Message::Ping(_))) => NextEventResponse::Ping, ),
Some(Ok(Message::Pong(_))) => NextEventResponse::Pong, Some(Ok(Message::Close(None))) => {
Some(Err(e)) => NextEventResponse::Error(e.to_string()), (1005, StringOrBuffer::String("".to_string()))
}
Some(Ok(Message::Ping(_))) => (
MessageKind::Ping as u16,
StringOrBuffer::Buffer(vec![].into()),
),
Some(Ok(Message::Pong(_))) => (
MessageKind::Pong as u16,
StringOrBuffer::Buffer(vec![].into()),
),
Some(Err(e)) => (
MessageKind::Error as u16,
StringOrBuffer::String(e.to_string()),
),
None => { None => {
// No message was received, presumably the socket closed while we waited. // No message was received, presumably the socket closed while we waited.
// Try close the stream, ignoring any errors, and report closed status to JavaScript. // Try close the stream, ignoring any errors, and report closed status to JavaScript.
let _ = state.borrow_mut().resource_table.close(rid); let _ = state.borrow_mut().resource_table.close(rid);
NextEventResponse::Closed (
MessageKind::Closed as u16,
StringOrBuffer::Buffer(vec![].into()),
)
} }
}; };
Ok(res) Ok(res)