mirror of
https://github.com/denoland/deno.git
synced 2025-01-18 11:53:59 -05:00
refactor(op_crates/websocket): refactor event loop (#9079)
This commit is contained in:
parent
9801858cb0
commit
2c1f74402c
2 changed files with 70 additions and 49 deletions
|
@ -294,59 +294,78 @@
|
||||||
}
|
}
|
||||||
|
|
||||||
async #eventLoop() {
|
async #eventLoop() {
|
||||||
if (this.#readyState === OPEN) {
|
while (this.#readyState === OPEN) {
|
||||||
const message = await core.jsonOpAsync(
|
const message = await core.jsonOpAsync(
|
||||||
"op_ws_next_event",
|
"op_ws_next_event",
|
||||||
{ rid: this.#rid },
|
{ rid: this.#rid },
|
||||||
);
|
);
|
||||||
if (message.type === "string" || message.type === "binary") {
|
|
||||||
let data;
|
|
||||||
|
|
||||||
if (message.type === "string") {
|
switch (message.kind) {
|
||||||
data = message.data;
|
case "string": {
|
||||||
} else {
|
const event = new MessageEvent("message", {
|
||||||
|
data: message.data,
|
||||||
|
origin: this.#url,
|
||||||
|
});
|
||||||
|
event.target = this;
|
||||||
|
this.dispatchEvent(event);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case "binary": {
|
||||||
|
let data;
|
||||||
|
|
||||||
if (this.binaryType === "blob") {
|
if (this.binaryType === "blob") {
|
||||||
data = new Blob([new Uint8Array(message.data)]);
|
data = new Blob([new Uint8Array(message.data)]);
|
||||||
} else {
|
} else {
|
||||||
data = new Uint8Array(message.data).buffer;
|
data = new Uint8Array(message.data).buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const event = new MessageEvent("message", {
|
||||||
|
data,
|
||||||
|
origin: this.#url,
|
||||||
|
});
|
||||||
|
event.target = this;
|
||||||
|
this.dispatchEvent(event);
|
||||||
|
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
const event = new MessageEvent("message", {
|
case "ping":
|
||||||
data,
|
core.jsonOpAsync("op_ws_send", {
|
||||||
origin: this.#url,
|
rid: this.#rid,
|
||||||
});
|
kind: "pong",
|
||||||
event.target = this;
|
});
|
||||||
this.dispatchEvent(event);
|
|
||||||
|
|
||||||
this.#eventLoop();
|
break;
|
||||||
} else if (message.type === "ping") {
|
|
||||||
core.jsonOpAsync("op_ws_send", {
|
|
||||||
rid: this.#rid,
|
|
||||||
kind: "pong",
|
|
||||||
});
|
|
||||||
|
|
||||||
this.#eventLoop();
|
case "close": {
|
||||||
} else if (message.type === "close") {
|
this.#readyState = CLOSED;
|
||||||
this.#readyState = CLOSED;
|
|
||||||
const event = new CloseEvent("close", {
|
|
||||||
wasClean: true,
|
|
||||||
code: message.code,
|
|
||||||
reason: message.reason,
|
|
||||||
});
|
|
||||||
event.target = this;
|
|
||||||
this.dispatchEvent(event);
|
|
||||||
} else if (message.type === "error") {
|
|
||||||
this.#readyState = CLOSED;
|
|
||||||
|
|
||||||
const errorEv = new ErrorEvent("error");
|
const event = new CloseEvent("close", {
|
||||||
errorEv.target = this;
|
wasClean: true,
|
||||||
this.dispatchEvent(errorEv);
|
code: message.data.code,
|
||||||
|
reason: message.data.reason,
|
||||||
|
});
|
||||||
|
event.target = this;
|
||||||
|
this.dispatchEvent(event);
|
||||||
|
|
||||||
this.#readyState = CLOSED;
|
break;
|
||||||
const closeEv = new CloseEvent("close");
|
}
|
||||||
closeEv.target = this;
|
|
||||||
this.dispatchEvent(closeEv);
|
case "error": {
|
||||||
|
this.#readyState = CLOSED;
|
||||||
|
|
||||||
|
const errorEv = new ErrorEvent("error");
|
||||||
|
errorEv.target = this;
|
||||||
|
this.dispatchEvent(errorEv);
|
||||||
|
|
||||||
|
const closeEv = new CloseEvent("close");
|
||||||
|
closeEv.target = this;
|
||||||
|
this.dispatchEvent(closeEv);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,7 @@ struct CheckPermissionArgs {
|
||||||
}
|
}
|
||||||
|
|
||||||
// This op is needed because creating a WS instance in JavaScript is a sync
|
// This op is needed because creating a WS instance in JavaScript is a sync
|
||||||
// operation and should throw error when permissions are not fullfiled,
|
// operation and should throw error when permissions are not fulfilled,
|
||||||
// but actual op that connects WS is async.
|
// but actual op that connects WS is async.
|
||||||
pub fn op_ws_check_permission<WP>(
|
pub fn op_ws_check_permission<WP>(
|
||||||
state: &mut OpState,
|
state: &mut OpState,
|
||||||
|
@ -155,7 +155,7 @@ where
|
||||||
let try_socket = TcpStream::connect(addr).await;
|
let try_socket = TcpStream::connect(addr).await;
|
||||||
let tcp_socket = match try_socket.map_err(TungsteniteError::Io) {
|
let tcp_socket = match try_socket.map_err(TungsteniteError::Io) {
|
||||||
Ok(socket) => socket,
|
Ok(socket) => socket,
|
||||||
Err(_) => return Ok(json!({"success": false})),
|
Err(_) => return Ok(json!({ "success": false })),
|
||||||
};
|
};
|
||||||
|
|
||||||
let socket: MaybeTlsStream = match uri.scheme_str() {
|
let socket: MaybeTlsStream = match uri.scheme_str() {
|
||||||
|
@ -305,28 +305,30 @@ pub async fn op_ws_next_event(
|
||||||
let val = rx.next().or_cancel(cancel).await?;
|
let val = rx.next().or_cancel(cancel).await?;
|
||||||
let res = match val {
|
let res = match val {
|
||||||
Some(Ok(Message::Text(text))) => json!({
|
Some(Ok(Message::Text(text))) => json!({
|
||||||
"type": "string",
|
"kind": "string",
|
||||||
"data": text
|
"data": text
|
||||||
}),
|
}),
|
||||||
Some(Ok(Message::Binary(data))) => {
|
Some(Ok(Message::Binary(data))) => {
|
||||||
// TODO(ry): don't use json to send binary data.
|
// TODO(ry): don't use json to send binary data.
|
||||||
json!({
|
json!({
|
||||||
"type": "binary",
|
"kind": "binary",
|
||||||
"data": data
|
"data": data
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
Some(Ok(Message::Close(Some(frame)))) => json!({
|
Some(Ok(Message::Close(Some(frame)))) => json!({
|
||||||
"type": "close",
|
"kind": "close",
|
||||||
"code": u16::from(frame.code),
|
"data": {
|
||||||
"reason": frame.reason.as_ref()
|
"code": u16::from(frame.code),
|
||||||
|
"reason": frame.reason.as_ref()
|
||||||
|
}
|
||||||
}),
|
}),
|
||||||
Some(Ok(Message::Close(None))) => json!({ "type": "close" }),
|
Some(Ok(Message::Close(None))) => json!({ "kind": "close" }),
|
||||||
Some(Ok(Message::Ping(_))) => json!({"type": "ping"}),
|
Some(Ok(Message::Ping(_))) => json!({ "kind": "ping" }),
|
||||||
Some(Ok(Message::Pong(_))) => json!({"type": "pong"}),
|
Some(Ok(Message::Pong(_))) => json!({ "kind": "pong" }),
|
||||||
Some(Err(_)) => json!({"type": "error"}),
|
Some(Err(_)) => json!({ "kind": "error" }),
|
||||||
None => {
|
None => {
|
||||||
state.borrow_mut().resource_table.close(args.rid).unwrap();
|
state.borrow_mut().resource_table.close(args.rid).unwrap();
|
||||||
json!({"type": "closed"})
|
json!({ "kind": "closed" })
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Ok(res)
|
Ok(res)
|
||||||
|
|
Loading…
Add table
Reference in a new issue