mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
perf(ext/websocket): replace tokio_tungstenite server with fastwebsockets (#18587)
https://github.com/littledivy/fastwebsockets ``` # This PR ./load_test 100 0.0.0.0 8080 0 0 Running benchmark now... Msg/sec: 176355.000000 # main ./load_test 100 0.0.0.0 8080 0 0 Running benchmark now... Msg/sec: 157198.750000 ```
This commit is contained in:
parent
3b62a58818
commit
4cc8784f5b
6 changed files with 266 additions and 39 deletions
19
Cargo.lock
generated
19
Cargo.lock
generated
|
@ -1274,6 +1274,7 @@ version = "0.102.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"deno_core",
|
"deno_core",
|
||||||
"deno_tls",
|
"deno_tls",
|
||||||
|
"fastwebsockets",
|
||||||
"http",
|
"http",
|
||||||
"hyper",
|
"hyper",
|
||||||
"serde",
|
"serde",
|
||||||
|
@ -1679,6 +1680,18 @@ dependencies = [
|
||||||
"instant",
|
"instant",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "fastwebsockets"
|
||||||
|
version = "0.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "45dfdedde2bd984f677056a9a804fe995990ab4f4594599c848c05a10ee8c05e"
|
||||||
|
dependencies = [
|
||||||
|
"cc",
|
||||||
|
"simdutf8",
|
||||||
|
"tokio",
|
||||||
|
"utf-8",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fd-lock"
|
name = "fd-lock"
|
||||||
version = "3.0.10"
|
version = "3.0.10"
|
||||||
|
@ -3913,6 +3926,12 @@ dependencies = [
|
||||||
"rand_core 0.6.4",
|
"rand_core 0.6.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "simdutf8"
|
||||||
|
version = "0.1.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "siphasher"
|
name = "siphasher"
|
||||||
version = "0.3.10"
|
version = "0.3.10"
|
||||||
|
|
|
@ -26,8 +26,10 @@ import {
|
||||||
_protocol,
|
_protocol,
|
||||||
_readyState,
|
_readyState,
|
||||||
_rid,
|
_rid,
|
||||||
|
_role,
|
||||||
_server,
|
_server,
|
||||||
_serverHandleIdleTimeout,
|
_serverHandleIdleTimeout,
|
||||||
|
SERVER,
|
||||||
WebSocket,
|
WebSocket,
|
||||||
} from "ext:deno_websocket/01_websocket.js";
|
} from "ext:deno_websocket/01_websocket.js";
|
||||||
import { listen, TcpConn, UnixConn } from "ext:deno_net/01_net.js";
|
import { listen, TcpConn, UnixConn } from "ext:deno_net/01_net.js";
|
||||||
|
@ -376,6 +378,7 @@ function createRespondWith(
|
||||||
httpConn.close();
|
httpConn.close();
|
||||||
|
|
||||||
ws[_readyState] = WebSocket.OPEN;
|
ws[_readyState] = WebSocket.OPEN;
|
||||||
|
ws[_role] = SERVER;
|
||||||
const event = new Event("open");
|
const event = new Event("open");
|
||||||
ws.dispatchEvent(event);
|
ws.dispatchEvent(event);
|
||||||
|
|
||||||
|
|
|
@ -78,6 +78,11 @@ webidl.converters["WebSocketSend"] = (V, opts) => {
|
||||||
return webidl.converters["USVString"](V, opts);
|
return webidl.converters["USVString"](V, opts);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/** role */
|
||||||
|
const SERVER = 0;
|
||||||
|
const CLIENT = 1;
|
||||||
|
|
||||||
|
/** state */
|
||||||
const CONNECTING = 0;
|
const CONNECTING = 0;
|
||||||
const OPEN = 1;
|
const OPEN = 1;
|
||||||
const CLOSING = 2;
|
const CLOSING = 2;
|
||||||
|
@ -86,6 +91,7 @@ const CLOSED = 3;
|
||||||
const _readyState = Symbol("[[readyState]]");
|
const _readyState = Symbol("[[readyState]]");
|
||||||
const _url = Symbol("[[url]]");
|
const _url = Symbol("[[url]]");
|
||||||
const _rid = Symbol("[[rid]]");
|
const _rid = Symbol("[[rid]]");
|
||||||
|
const _role = Symbol("[[role]]");
|
||||||
const _extensions = Symbol("[[extensions]]");
|
const _extensions = Symbol("[[extensions]]");
|
||||||
const _protocol = Symbol("[[protocol]]");
|
const _protocol = Symbol("[[protocol]]");
|
||||||
const _binaryType = Symbol("[[binaryType]]");
|
const _binaryType = Symbol("[[binaryType]]");
|
||||||
|
@ -98,6 +104,7 @@ const _idleTimeoutTimeout = Symbol("[[idleTimeoutTimeout]]");
|
||||||
const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]");
|
const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]");
|
||||||
class WebSocket extends EventTarget {
|
class WebSocket extends EventTarget {
|
||||||
[_rid];
|
[_rid];
|
||||||
|
[_role];
|
||||||
|
|
||||||
[_readyState] = CONNECTING;
|
[_readyState] = CONNECTING;
|
||||||
get readyState() {
|
get readyState() {
|
||||||
|
@ -203,6 +210,7 @@ class WebSocket extends EventTarget {
|
||||||
}
|
}
|
||||||
|
|
||||||
this[_url] = wsURL.href;
|
this[_url] = wsURL.href;
|
||||||
|
this[_role] = CLIENT;
|
||||||
|
|
||||||
ops.op_ws_check_permission_and_cancel_handle(
|
ops.op_ws_check_permission_and_cancel_handle(
|
||||||
"WebSocket.abort()",
|
"WebSocket.abort()",
|
||||||
|
@ -312,7 +320,13 @@ class WebSocket extends EventTarget {
|
||||||
const sendTypedArray = (view, byteLength) => {
|
const sendTypedArray = (view, byteLength) => {
|
||||||
this[_bufferedAmount] += byteLength;
|
this[_bufferedAmount] += byteLength;
|
||||||
PromisePrototypeThen(
|
PromisePrototypeThen(
|
||||||
core.opAsync2("op_ws_send_binary", this[_rid], view),
|
core.opAsync2(
|
||||||
|
this[_role] === SERVER
|
||||||
|
? "op_server_ws_send_binary"
|
||||||
|
: "op_ws_send_binary",
|
||||||
|
this[_rid],
|
||||||
|
view,
|
||||||
|
),
|
||||||
() => {
|
() => {
|
||||||
this[_bufferedAmount] -= byteLength;
|
this[_bufferedAmount] -= byteLength;
|
||||||
},
|
},
|
||||||
|
@ -346,7 +360,11 @@ class WebSocket extends EventTarget {
|
||||||
const d = core.encode(string);
|
const d = core.encode(string);
|
||||||
this[_bufferedAmount] += TypedArrayPrototypeGetByteLength(d);
|
this[_bufferedAmount] += TypedArrayPrototypeGetByteLength(d);
|
||||||
PromisePrototypeThen(
|
PromisePrototypeThen(
|
||||||
core.opAsync2("op_ws_send_text", this[_rid], string),
|
core.opAsync2(
|
||||||
|
this[_role] === SERVER ? "op_server_ws_send_text" : "op_ws_send_text",
|
||||||
|
this[_rid],
|
||||||
|
string,
|
||||||
|
),
|
||||||
() => {
|
() => {
|
||||||
this[_bufferedAmount] -= TypedArrayPrototypeGetByteLength(d);
|
this[_bufferedAmount] -= TypedArrayPrototypeGetByteLength(d);
|
||||||
},
|
},
|
||||||
|
@ -401,7 +419,12 @@ class WebSocket extends EventTarget {
|
||||||
this[_readyState] = CLOSING;
|
this[_readyState] = CLOSING;
|
||||||
|
|
||||||
PromisePrototypeCatch(
|
PromisePrototypeCatch(
|
||||||
core.opAsync("op_ws_close", this[_rid], code, reason),
|
core.opAsync(
|
||||||
|
this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close",
|
||||||
|
this[_rid],
|
||||||
|
code,
|
||||||
|
reason,
|
||||||
|
),
|
||||||
(err) => {
|
(err) => {
|
||||||
this[_readyState] = CLOSED;
|
this[_readyState] = CLOSED;
|
||||||
|
|
||||||
|
@ -422,7 +445,7 @@ class WebSocket extends EventTarget {
|
||||||
async [_eventLoop]() {
|
async [_eventLoop]() {
|
||||||
while (this[_readyState] !== CLOSED) {
|
while (this[_readyState] !== CLOSED) {
|
||||||
const { 0: kind, 1: value } = await core.opAsync2(
|
const { 0: kind, 1: value } = await core.opAsync2(
|
||||||
"op_ws_next_event",
|
this[_role] === SERVER ? "op_server_ws_next_event" : "op_ws_next_event",
|
||||||
this[_rid],
|
this[_rid],
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -489,7 +512,7 @@ class WebSocket extends EventTarget {
|
||||||
if (prevState === OPEN) {
|
if (prevState === OPEN) {
|
||||||
try {
|
try {
|
||||||
await core.opAsync(
|
await core.opAsync(
|
||||||
"op_ws_close",
|
this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close",
|
||||||
this[_rid],
|
this[_rid],
|
||||||
code,
|
code,
|
||||||
value,
|
value,
|
||||||
|
@ -517,14 +540,23 @@ class WebSocket extends EventTarget {
|
||||||
clearTimeout(this[_idleTimeoutTimeout]);
|
clearTimeout(this[_idleTimeoutTimeout]);
|
||||||
this[_idleTimeoutTimeout] = setTimeout(async () => {
|
this[_idleTimeoutTimeout] = setTimeout(async () => {
|
||||||
if (this[_readyState] === OPEN) {
|
if (this[_readyState] === OPEN) {
|
||||||
await core.opAsync("op_ws_send", this[_rid], {
|
await core.opAsync(
|
||||||
|
this[_role] === SERVER ? "op_server_ws_send" : "op_ws_send",
|
||||||
|
this[_rid],
|
||||||
|
{
|
||||||
kind: "ping",
|
kind: "ping",
|
||||||
});
|
},
|
||||||
|
);
|
||||||
this[_idleTimeoutTimeout] = setTimeout(async () => {
|
this[_idleTimeoutTimeout] = setTimeout(async () => {
|
||||||
if (this[_readyState] === OPEN) {
|
if (this[_readyState] === OPEN) {
|
||||||
this[_readyState] = CLOSING;
|
this[_readyState] = CLOSING;
|
||||||
const reason = "No response from ping frame.";
|
const reason = "No response from ping frame.";
|
||||||
await core.opAsync("op_ws_close", this[_rid], 1001, reason);
|
await core.opAsync(
|
||||||
|
this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close",
|
||||||
|
this[_rid],
|
||||||
|
1001,
|
||||||
|
reason,
|
||||||
|
);
|
||||||
this[_readyState] = CLOSED;
|
this[_readyState] = CLOSED;
|
||||||
|
|
||||||
const errEvent = new ErrorEvent("error", {
|
const errEvent = new ErrorEvent("error", {
|
||||||
|
@ -594,7 +626,9 @@ export {
|
||||||
_protocol,
|
_protocol,
|
||||||
_readyState,
|
_readyState,
|
||||||
_rid,
|
_rid,
|
||||||
|
_role,
|
||||||
_server,
|
_server,
|
||||||
_serverHandleIdleTimeout,
|
_serverHandleIdleTimeout,
|
||||||
|
SERVER,
|
||||||
WebSocket,
|
WebSocket,
|
||||||
};
|
};
|
||||||
|
|
|
@ -16,6 +16,7 @@ path = "lib.rs"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
deno_core.workspace = true
|
deno_core.workspace = true
|
||||||
deno_tls.workspace = true
|
deno_tls.workspace = true
|
||||||
|
fastwebsockets = "0.1.0"
|
||||||
http.workspace = true
|
http.workspace = true
|
||||||
hyper.workspace = true
|
hyper.workspace = true
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
|
|
|
@ -47,13 +47,16 @@ use tokio_tungstenite::tungstenite::handshake::client::Response;
|
||||||
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
|
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
|
||||||
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
|
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
|
||||||
use tokio_tungstenite::tungstenite::protocol::Message;
|
use tokio_tungstenite::tungstenite::protocol::Message;
|
||||||
use tokio_tungstenite::tungstenite::protocol::Role;
|
|
||||||
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
|
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
|
||||||
use tokio_tungstenite::MaybeTlsStream;
|
use tokio_tungstenite::MaybeTlsStream;
|
||||||
use tokio_tungstenite::WebSocketStream;
|
use tokio_tungstenite::WebSocketStream;
|
||||||
|
|
||||||
pub use tokio_tungstenite; // Re-export tokio_tungstenite
|
pub use tokio_tungstenite; // Re-export tokio_tungstenite
|
||||||
|
|
||||||
|
mod server;
|
||||||
|
|
||||||
|
pub use server::ws_create_server_stream;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct WsRootStore(pub Option<RootCertStore>);
|
pub struct WsRootStore(pub Option<RootCertStore>);
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -89,35 +92,6 @@ pub enum WebSocketStreamType {
|
||||||
|
|
||||||
pub trait Upgraded: AsyncRead + AsyncWrite + Unpin {}
|
pub trait Upgraded: AsyncRead + AsyncWrite + Unpin {}
|
||||||
|
|
||||||
pub async fn ws_create_server_stream(
|
|
||||||
state: &Rc<RefCell<OpState>>,
|
|
||||||
transport: Pin<Box<dyn Upgraded>>,
|
|
||||||
) -> Result<ResourceId, AnyError> {
|
|
||||||
let ws_stream = WebSocketStream::from_raw_socket(
|
|
||||||
transport,
|
|
||||||
Role::Server,
|
|
||||||
Some(WebSocketConfig {
|
|
||||||
max_message_size: Some(128 << 20),
|
|
||||||
max_frame_size: Some(32 << 20),
|
|
||||||
..Default::default()
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
let (ws_tx, ws_rx) = ws_stream.split();
|
|
||||||
|
|
||||||
let ws_resource = WsStreamResource {
|
|
||||||
stream: WebSocketStreamType::Server {
|
|
||||||
tx: AsyncRefCell::new(ws_tx),
|
|
||||||
rx: AsyncRefCell::new(ws_rx),
|
|
||||||
},
|
|
||||||
cancel: Default::default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let resource_table = &mut state.borrow_mut().resource_table;
|
|
||||||
let rid = resource_table.add(ws_resource);
|
|
||||||
Ok(rid)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct WsStreamResource {
|
pub struct WsStreamResource {
|
||||||
pub stream: WebSocketStreamType,
|
pub stream: WebSocketStreamType,
|
||||||
// When a `WsStreamResource` resource is closed, all pending 'read' ops are
|
// When a `WsStreamResource` resource is closed, all pending 'read' ops are
|
||||||
|
@ -549,6 +523,11 @@ deno_core::extension!(deno_websocket,
|
||||||
op_ws_next_event,
|
op_ws_next_event,
|
||||||
op_ws_send_binary,
|
op_ws_send_binary,
|
||||||
op_ws_send_text,
|
op_ws_send_text,
|
||||||
|
server::op_server_ws_send,
|
||||||
|
server::op_server_ws_close,
|
||||||
|
server::op_server_ws_next_event,
|
||||||
|
server::op_server_ws_send_binary,
|
||||||
|
server::op_server_ws_send_text,
|
||||||
],
|
],
|
||||||
esm = [ "01_websocket.js", "02_websocketstream.js" ],
|
esm = [ "01_websocket.js", "02_websocketstream.js" ],
|
||||||
options = {
|
options = {
|
||||||
|
|
191
ext/websocket/server.rs
Normal file
191
ext/websocket/server.rs
Normal file
|
@ -0,0 +1,191 @@
|
||||||
|
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
use crate::MessageKind;
|
||||||
|
use crate::SendValue;
|
||||||
|
use crate::Upgraded;
|
||||||
|
use deno_core::error::type_error;
|
||||||
|
use deno_core::error::AnyError;
|
||||||
|
use deno_core::op;
|
||||||
|
use deno_core::AsyncRefCell;
|
||||||
|
use deno_core::OpState;
|
||||||
|
use deno_core::RcRef;
|
||||||
|
use deno_core::Resource;
|
||||||
|
use deno_core::ResourceId;
|
||||||
|
use deno_core::StringOrBuffer;
|
||||||
|
use deno_core::ZeroCopyBuf;
|
||||||
|
use std::borrow::Cow;
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
use fastwebsockets::CloseCode;
|
||||||
|
use fastwebsockets::FragmentCollector;
|
||||||
|
use fastwebsockets::Frame;
|
||||||
|
use fastwebsockets::OpCode;
|
||||||
|
use fastwebsockets::WebSocket;
|
||||||
|
|
||||||
|
pub struct ServerWebSocket {
|
||||||
|
ws: AsyncRefCell<FragmentCollector<Pin<Box<dyn Upgraded>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Resource for ServerWebSocket {
|
||||||
|
fn name(&self) -> Cow<str> {
|
||||||
|
"serverWebSocket".into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub async fn ws_create_server_stream(
|
||||||
|
state: &Rc<RefCell<OpState>>,
|
||||||
|
transport: Pin<Box<dyn Upgraded>>,
|
||||||
|
) -> Result<ResourceId, AnyError> {
|
||||||
|
let mut ws = WebSocket::after_handshake(transport);
|
||||||
|
ws.set_writev(false);
|
||||||
|
ws.set_auto_close(true);
|
||||||
|
ws.set_auto_pong(true);
|
||||||
|
|
||||||
|
let ws_resource = ServerWebSocket {
|
||||||
|
ws: AsyncRefCell::new(FragmentCollector::new(ws)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let resource_table = &mut state.borrow_mut().resource_table;
|
||||||
|
let rid = resource_table.add(ws_resource);
|
||||||
|
Ok(rid)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[op]
|
||||||
|
pub async fn op_server_ws_send_binary(
|
||||||
|
state: Rc<RefCell<OpState>>,
|
||||||
|
rid: ResourceId,
|
||||||
|
data: ZeroCopyBuf,
|
||||||
|
) -> Result<(), AnyError> {
|
||||||
|
let resource = state
|
||||||
|
.borrow_mut()
|
||||||
|
.resource_table
|
||||||
|
.get::<ServerWebSocket>(rid)?;
|
||||||
|
|
||||||
|
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
||||||
|
ws.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
|
||||||
|
.await
|
||||||
|
.map_err(|err| type_error(err.to_string()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[op]
|
||||||
|
pub async fn op_server_ws_send_text(
|
||||||
|
state: Rc<RefCell<OpState>>,
|
||||||
|
rid: ResourceId,
|
||||||
|
data: String,
|
||||||
|
) -> Result<(), AnyError> {
|
||||||
|
let resource = state
|
||||||
|
.borrow_mut()
|
||||||
|
.resource_table
|
||||||
|
.get::<ServerWebSocket>(rid)?;
|
||||||
|
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
||||||
|
ws.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
|
||||||
|
.await
|
||||||
|
.map_err(|err| type_error(err.to_string()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[op]
|
||||||
|
pub async fn op_server_ws_send(
|
||||||
|
state: Rc<RefCell<OpState>>,
|
||||||
|
rid: ResourceId,
|
||||||
|
value: SendValue,
|
||||||
|
) -> Result<(), AnyError> {
|
||||||
|
let msg = match value {
|
||||||
|
SendValue::Text(text) => {
|
||||||
|
Frame::new(true, OpCode::Text, None, text.into_bytes())
|
||||||
|
}
|
||||||
|
SendValue::Binary(buf) => {
|
||||||
|
Frame::new(true, OpCode::Binary, None, buf.to_vec())
|
||||||
|
}
|
||||||
|
SendValue::Pong => Frame::new(true, OpCode::Pong, None, vec![]),
|
||||||
|
SendValue::Ping => Frame::new(true, OpCode::Ping, None, vec![]),
|
||||||
|
};
|
||||||
|
|
||||||
|
let resource = state
|
||||||
|
.borrow_mut()
|
||||||
|
.resource_table
|
||||||
|
.get::<ServerWebSocket>(rid)?;
|
||||||
|
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
||||||
|
|
||||||
|
ws.write_frame(msg)
|
||||||
|
.await
|
||||||
|
.map_err(|err| type_error(err.to_string()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[op(deferred)]
|
||||||
|
pub async fn op_server_ws_close(
|
||||||
|
state: Rc<RefCell<OpState>>,
|
||||||
|
rid: ResourceId,
|
||||||
|
code: Option<u16>,
|
||||||
|
reason: Option<String>,
|
||||||
|
) -> Result<(), AnyError> {
|
||||||
|
let resource = state
|
||||||
|
.borrow_mut()
|
||||||
|
.resource_table
|
||||||
|
.get::<ServerWebSocket>(rid)?;
|
||||||
|
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
||||||
|
let frame = reason
|
||||||
|
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
|
||||||
|
.unwrap_or_else(|| Frame::close_raw(vec![]));
|
||||||
|
ws.write_frame(frame)
|
||||||
|
.await
|
||||||
|
.map_err(|err| type_error(err.to_string()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[op]
|
||||||
|
pub async fn op_server_ws_next_event(
|
||||||
|
state: Rc<RefCell<OpState>>,
|
||||||
|
rid: ResourceId,
|
||||||
|
) -> Result<(u16, StringOrBuffer), AnyError> {
|
||||||
|
let resource = state
|
||||||
|
.borrow_mut()
|
||||||
|
.resource_table
|
||||||
|
.get::<ServerWebSocket>(rid)?;
|
||||||
|
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
|
||||||
|
let val = match ws.read_frame().await {
|
||||||
|
Ok(val) => val,
|
||||||
|
Err(err) => {
|
||||||
|
return Ok((
|
||||||
|
MessageKind::Error as u16,
|
||||||
|
StringOrBuffer::String(err.to_string()),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let res = match val.opcode {
|
||||||
|
OpCode::Text => (
|
||||||
|
MessageKind::Text as u16,
|
||||||
|
StringOrBuffer::String(String::from_utf8(val.payload).unwrap()),
|
||||||
|
),
|
||||||
|
OpCode::Binary => (
|
||||||
|
MessageKind::Binary as u16,
|
||||||
|
StringOrBuffer::Buffer(val.payload.into()),
|
||||||
|
),
|
||||||
|
OpCode::Close => {
|
||||||
|
if val.payload.len() < 2 {
|
||||||
|
return Ok((1005, StringOrBuffer::String("".to_string())));
|
||||||
|
}
|
||||||
|
|
||||||
|
let close_code =
|
||||||
|
CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]]));
|
||||||
|
let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap();
|
||||||
|
(close_code.into(), StringOrBuffer::String(reason))
|
||||||
|
}
|
||||||
|
OpCode::Ping => (
|
||||||
|
MessageKind::Ping as u16,
|
||||||
|
StringOrBuffer::Buffer(vec![].into()),
|
||||||
|
),
|
||||||
|
OpCode::Pong => (
|
||||||
|
MessageKind::Pong as u16,
|
||||||
|
StringOrBuffer::Buffer(vec![].into()),
|
||||||
|
),
|
||||||
|
OpCode::Continuation => {
|
||||||
|
return Err(type_error("Unexpected continuation frame"))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(res)
|
||||||
|
}
|
Loading…
Reference in a new issue