mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 08:33:43 -05:00
perf(ext/websocket): efficient event kind serialization (#18509)
Use u16 to represent the kind of event (0 - 6) & event code > 6 is treated as the close code. This way we can represent all events + the close code in a single JS number. This is safe because (as per RFC 6455) close code from 0-999 are reserved & not used. | name | avg msg/sec/core | | --- | --- | | deno_main | `127820.750000` | | deno #18506 | `140079.000000` | | deno #18506 + this | `150104.250000` |
This commit is contained in:
parent
7722014497
commit
0f41aff1d9
3 changed files with 112 additions and 73 deletions
|
@ -394,13 +394,14 @@ class WebSocket extends EventTarget {
|
|||
|
||||
async [_eventLoop]() {
|
||||
while (this[_readyState] !== CLOSED) {
|
||||
const { kind, value } = await core.opAsync(
|
||||
const { 0: kind, 1: value } = await core.opAsync(
|
||||
"op_ws_next_event",
|
||||
this[_rid],
|
||||
);
|
||||
|
||||
switch (kind) {
|
||||
case "string": {
|
||||
case 0: {
|
||||
/* string */
|
||||
this[_serverHandleIdleTimeout]();
|
||||
const event = new MessageEvent("message", {
|
||||
data: value,
|
||||
|
@ -409,14 +410,15 @@ class WebSocket extends EventTarget {
|
|||
this.dispatchEvent(event);
|
||||
break;
|
||||
}
|
||||
case "binary": {
|
||||
case 1: {
|
||||
/* binary */
|
||||
this[_serverHandleIdleTimeout]();
|
||||
let data;
|
||||
|
||||
if (this.binaryType === "blob") {
|
||||
data = new Blob([value]);
|
||||
} else {
|
||||
data = value.buffer;
|
||||
data = value;
|
||||
}
|
||||
|
||||
const event = new MessageEvent("message", {
|
||||
|
@ -427,39 +429,13 @@ class WebSocket extends EventTarget {
|
|||
this.dispatchEvent(event);
|
||||
break;
|
||||
}
|
||||
case "pong": {
|
||||
case 2: {
|
||||
/* pong */
|
||||
this[_serverHandleIdleTimeout]();
|
||||
break;
|
||||
}
|
||||
case "closed":
|
||||
case "close": {
|
||||
const prevState = this[_readyState];
|
||||
this[_readyState] = CLOSED;
|
||||
clearTimeout(this[_idleTimeoutTimeout]);
|
||||
|
||||
if (prevState === OPEN) {
|
||||
try {
|
||||
await core.opAsync(
|
||||
"op_ws_close",
|
||||
this[_rid],
|
||||
value.code,
|
||||
value.reason,
|
||||
);
|
||||
} catch {
|
||||
// ignore failures
|
||||
}
|
||||
}
|
||||
|
||||
const event = new CloseEvent("close", {
|
||||
wasClean: true,
|
||||
code: value.code,
|
||||
reason: value.reason,
|
||||
});
|
||||
this.dispatchEvent(event);
|
||||
core.tryClose(this[_rid]);
|
||||
break;
|
||||
}
|
||||
case "error": {
|
||||
case 5: {
|
||||
/* error */
|
||||
this[_readyState] = CLOSED;
|
||||
|
||||
const errorEv = new ErrorEvent("error", {
|
||||
|
@ -472,6 +448,39 @@ class WebSocket extends EventTarget {
|
|||
core.tryClose(this[_rid]);
|
||||
break;
|
||||
}
|
||||
case 3: {
|
||||
/* ping */
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
/* close */
|
||||
const code = kind;
|
||||
const prevState = this[_readyState];
|
||||
this[_readyState] = CLOSED;
|
||||
clearTimeout(this[_idleTimeoutTimeout]);
|
||||
|
||||
if (prevState === OPEN) {
|
||||
try {
|
||||
await core.opAsync(
|
||||
"op_ws_close",
|
||||
this[_rid],
|
||||
code,
|
||||
value,
|
||||
);
|
||||
} catch {
|
||||
// ignore failures
|
||||
}
|
||||
}
|
||||
|
||||
const event = new CloseEvent("close", {
|
||||
wasClean: true,
|
||||
code: code,
|
||||
reason: value,
|
||||
});
|
||||
this.dispatchEvent(event);
|
||||
core.tryClose(this[_rid]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -167,12 +167,13 @@ class WebSocketStream {
|
|||
PromisePrototypeThen(
|
||||
(async () => {
|
||||
while (true) {
|
||||
const { kind } = await core.opAsync(
|
||||
const { 0: kind } = await core.opAsync(
|
||||
"op_ws_next_event",
|
||||
create.rid,
|
||||
);
|
||||
|
||||
if (kind === "close") {
|
||||
if (kind > 6) {
|
||||
/* close */
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -237,37 +238,51 @@ class WebSocketStream {
|
|||
},
|
||||
});
|
||||
const pull = async (controller) => {
|
||||
const { kind, value } = await core.opAsync(
|
||||
const { 0: kind, 1: value } = await core.opAsync(
|
||||
"op_ws_next_event",
|
||||
this[_rid],
|
||||
);
|
||||
|
||||
switch (kind) {
|
||||
case "string": {
|
||||
case 0:
|
||||
case 1: {
|
||||
/* string */
|
||||
/* binary */
|
||||
controller.enqueue(value);
|
||||
break;
|
||||
}
|
||||
case "binary": {
|
||||
controller.enqueue(value);
|
||||
case 5: {
|
||||
/* error */
|
||||
const err = new Error(value);
|
||||
this[_closed].reject(err);
|
||||
controller.error(err);
|
||||
core.tryClose(this[_rid]);
|
||||
break;
|
||||
}
|
||||
case "ping": {
|
||||
case 3: {
|
||||
/* ping */
|
||||
await core.opAsync("op_ws_send", this[_rid], {
|
||||
kind: "pong",
|
||||
});
|
||||
await pull(controller);
|
||||
break;
|
||||
}
|
||||
case "closed":
|
||||
case "close": {
|
||||
this[_closed].resolve(value);
|
||||
case 2: {
|
||||
/* pong */
|
||||
break;
|
||||
}
|
||||
case 6: {
|
||||
/* closed */
|
||||
this[_closed].resolve(undefined);
|
||||
core.tryClose(this[_rid]);
|
||||
break;
|
||||
}
|
||||
case "error": {
|
||||
const err = new Error(value);
|
||||
this[_closed].reject(err);
|
||||
controller.error(err);
|
||||
default: {
|
||||
/* close */
|
||||
this[_closed].resolve({
|
||||
code: kind,
|
||||
reason: value,
|
||||
});
|
||||
core.tryClose(this[_rid]);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ use deno_core::futures::stream::SplitStream;
|
|||
use deno_core::futures::SinkExt;
|
||||
use deno_core::futures::StreamExt;
|
||||
use deno_core::op;
|
||||
use deno_core::StringOrBuffer;
|
||||
|
||||
use deno_core::url;
|
||||
use deno_core::AsyncRefCell;
|
||||
|
@ -475,23 +476,21 @@ pub async fn op_ws_close(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(tag = "kind", content = "value", rename_all = "camelCase")]
|
||||
pub enum NextEventResponse {
|
||||
String(String),
|
||||
Binary(ZeroCopyBuf),
|
||||
Close { code: u16, reason: String },
|
||||
Ping,
|
||||
Pong,
|
||||
Error(String),
|
||||
Closed,
|
||||
#[repr(u16)]
|
||||
pub enum MessageKind {
|
||||
Text = 0,
|
||||
Binary = 1,
|
||||
Pong = 2,
|
||||
Ping = 3,
|
||||
Error = 5,
|
||||
Closed = 6,
|
||||
}
|
||||
|
||||
#[op]
|
||||
pub async fn op_ws_next_event(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
rid: ResourceId,
|
||||
) -> Result<NextEventResponse, AnyError> {
|
||||
) -> Result<(u16, StringOrBuffer), AnyError> {
|
||||
let resource = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
|
@ -500,24 +499,40 @@ pub async fn op_ws_next_event(
|
|||
let cancel = RcRef::map(&resource, |r| &r.cancel);
|
||||
let val = resource.next_message(cancel).await?;
|
||||
let res = match val {
|
||||
Some(Ok(Message::Text(text))) => NextEventResponse::String(text),
|
||||
Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()),
|
||||
Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close {
|
||||
code: frame.code.into(),
|
||||
reason: frame.reason.to_string(),
|
||||
},
|
||||
Some(Ok(Message::Close(None))) => NextEventResponse::Close {
|
||||
code: 1005,
|
||||
reason: String::new(),
|
||||
},
|
||||
Some(Ok(Message::Ping(_))) => NextEventResponse::Ping,
|
||||
Some(Ok(Message::Pong(_))) => NextEventResponse::Pong,
|
||||
Some(Err(e)) => NextEventResponse::Error(e.to_string()),
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
(MessageKind::Text as u16, StringOrBuffer::String(text))
|
||||
}
|
||||
Some(Ok(Message::Binary(data))) => (
|
||||
MessageKind::Binary as u16,
|
||||
StringOrBuffer::Buffer(data.into()),
|
||||
),
|
||||
Some(Ok(Message::Close(Some(frame)))) => (
|
||||
frame.code.into(),
|
||||
StringOrBuffer::String(frame.reason.to_string()),
|
||||
),
|
||||
Some(Ok(Message::Close(None))) => {
|
||||
(1005, StringOrBuffer::String("".to_string()))
|
||||
}
|
||||
Some(Ok(Message::Ping(_))) => (
|
||||
MessageKind::Ping as u16,
|
||||
StringOrBuffer::Buffer(vec![].into()),
|
||||
),
|
||||
Some(Ok(Message::Pong(_))) => (
|
||||
MessageKind::Pong as u16,
|
||||
StringOrBuffer::Buffer(vec![].into()),
|
||||
),
|
||||
Some(Err(e)) => (
|
||||
MessageKind::Error as u16,
|
||||
StringOrBuffer::String(e.to_string()),
|
||||
),
|
||||
None => {
|
||||
// No message was received, presumably the socket closed while we waited.
|
||||
// Try close the stream, ignoring any errors, and report closed status to JavaScript.
|
||||
let _ = state.borrow_mut().resource_table.close(rid);
|
||||
NextEventResponse::Closed
|
||||
(
|
||||
MessageKind::Closed as u16,
|
||||
StringOrBuffer::Buffer(vec![].into()),
|
||||
)
|
||||
}
|
||||
};
|
||||
Ok(res)
|
||||
|
|
Loading…
Reference in a new issue