1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 08:33:43 -05:00
Closes https://github.com/denoland/deno/issues/16450
This commit is contained in:
Bartek Iwańczuk 2022-11-01 16:06:06 +01:00 committed by GitHub
parent 89c5aa8598
commit f5cb26a82f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 104 deletions

View file

@ -20,7 +20,6 @@
ArrayPrototypeJoin, ArrayPrototypeJoin,
ArrayPrototypeMap, ArrayPrototypeMap,
ArrayPrototypeSome, ArrayPrototypeSome,
Uint32Array,
ErrorPrototypeToString, ErrorPrototypeToString,
ObjectDefineProperties, ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf, ObjectPrototypeIsPrototypeOf,
@ -85,10 +84,6 @@
const _idleTimeoutDuration = Symbol("[[idleTimeout]]"); const _idleTimeoutDuration = Symbol("[[idleTimeout]]");
const _idleTimeoutTimeout = Symbol("[[idleTimeoutTimeout]]"); const _idleTimeoutTimeout = Symbol("[[idleTimeoutTimeout]]");
const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]"); const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]");
/* [event type, close code] */
const eventBuf = new Uint32Array(2);
class WebSocket extends EventTarget { class WebSocket extends EventTarget {
[_rid]; [_rid];
@ -417,15 +412,13 @@
async [_eventLoop]() { async [_eventLoop]() {
while (this[_readyState] !== CLOSED) { while (this[_readyState] !== CLOSED) {
const value = await core.opAsync( const { kind, value } = await core.opAsync(
"op_ws_next_event", "op_ws_next_event",
this[_rid], this[_rid],
eventBuf,
); );
const kind = eventBuf[0];
switch (kind) { switch (kind) {
/* string */ case "string": {
case 0: {
this[_serverHandleIdleTimeout](); this[_serverHandleIdleTimeout]();
const event = new MessageEvent("message", { const event = new MessageEvent("message", {
data: value, data: value,
@ -434,15 +427,14 @@
this.dispatchEvent(event); this.dispatchEvent(event);
break; break;
} }
/* binary */ case "binary": {
case 1: {
this[_serverHandleIdleTimeout](); this[_serverHandleIdleTimeout]();
let data; let data;
if (this.binaryType === "blob") { if (this.binaryType === "blob") {
data = new Blob([value]); data = new Blob([value]);
} else { } else {
data = value; data = value.buffer;
} }
const event = new MessageEvent("message", { const event = new MessageEvent("message", {
@ -452,23 +444,18 @@
this.dispatchEvent(event); this.dispatchEvent(event);
break; break;
} }
/* ping */ case "ping": {
case 3: {
core.opAsync("op_ws_send", this[_rid], { core.opAsync("op_ws_send", this[_rid], {
kind: "pong", kind: "pong",
}); });
break; break;
} }
/* pong */ case "pong": {
case 4: {
this[_serverHandleIdleTimeout](); this[_serverHandleIdleTimeout]();
break; break;
} }
/* closed */ case "closed":
case 6: // falls through case "close": {
/* close */
case 2: {
const code = eventBuf[1];
const prevState = this[_readyState]; const prevState = this[_readyState];
this[_readyState] = CLOSED; this[_readyState] = CLOSED;
clearTimeout(this[_idleTimeoutTimeout]); clearTimeout(this[_idleTimeoutTimeout]);
@ -478,8 +465,8 @@
await core.opAsync( await core.opAsync(
"op_ws_close", "op_ws_close",
this[_rid], this[_rid],
code, value.code,
value, value.reason,
); );
} catch { } catch {
// ignore failures // ignore failures
@ -488,15 +475,14 @@
const event = new CloseEvent("close", { const event = new CloseEvent("close", {
wasClean: true, wasClean: true,
code, code: value.code,
reason: value, reason: value.reason,
}); });
this.dispatchEvent(event); this.dispatchEvent(event);
core.tryClose(this[_rid]); core.tryClose(this[_rid]);
break; break;
} }
/* error */ case "error": {
case 5: {
this[_readyState] = CLOSED; this[_readyState] = CLOSED;
const errorEv = new ErrorEvent("error", { const errorEv = new ErrorEvent("error", {

View file

@ -27,7 +27,6 @@
SymbolFor, SymbolFor,
TypeError, TypeError,
Uint8ArrayPrototype, Uint8ArrayPrototype,
Uint32Array,
} = window.__bootstrap.primordials; } = window.__bootstrap.primordials;
webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter( webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter(
@ -169,15 +168,12 @@
PromisePrototypeThen( PromisePrototypeThen(
(async () => { (async () => {
while (true) { while (true) {
const kind = new Uint32Array(2); const { kind } = await core.opAsync(
await core.opAsync(
"op_ws_next_event", "op_ws_next_event",
create.rid, create.rid,
kind,
); );
/* close */ if (kind === "close") {
if (kind[0] === 2) {
break; break;
} }
} }
@ -241,46 +237,35 @@
await this.closed; await this.closed;
}, },
}); });
const pull = async (controller) => { const pull = async (controller) => {
/* [event type, close code] */ const { kind, value } = await core.opAsync(
const eventBuf = new Uint32Array(2);
const value = await core.opAsync(
"op_ws_next_event", "op_ws_next_event",
this[_rid], this[_rid],
eventBuf,
); );
const kind = eventBuf[0];
switch (kind) { switch (kind) {
/* string */ case "string": {
case 0: {
controller.enqueue(value); controller.enqueue(value);
break; break;
} }
/* binary */ case "binary": {
case 1: {
controller.enqueue(value); controller.enqueue(value);
break; break;
} }
/* ping */ case "ping": {
case 3: {
await core.opAsync("op_ws_send", this[_rid], { await core.opAsync("op_ws_send", this[_rid], {
kind: "pong", kind: "pong",
}); });
await pull(controller); await pull(controller);
break; break;
} }
/* closed */ case "closed":
case 6: // falls through case "close": {
/* close */ this[_closed].resolve(value);
case 2: {
const code = eventBuf[1];
this[_closed].resolve({ code, reason: value });
core.tryClose(this[_rid]); core.tryClose(this[_rid]);
break; break;
} }
/* error */ case "error": {
case 5: {
const err = new Error(value); const err = new Error(value);
this[_closed].reject(err); this[_closed].reject(err);
controller.error(err); controller.error(err);
@ -300,8 +285,7 @@
return pull(controller); return pull(controller);
} }
const code = eventBuf[1]; this[_closed].resolve(value);
this[_closed].resolve({ code, reason: value });
core.tryClose(this[_rid]); core.tryClose(this[_rid]);
} }
}; };

View file

@ -20,7 +20,6 @@ use deno_core::OpState;
use deno_core::RcRef; use deno_core::RcRef;
use deno_core::Resource; use deno_core::Resource;
use deno_core::ResourceId; use deno_core::ResourceId;
use deno_core::StringOrBuffer;
use deno_core::ZeroCopyBuf; use deno_core::ZeroCopyBuf;
use deno_tls::create_client_config; use deno_tls::create_client_config;
use http::header::HeaderName; use http::header::HeaderName;
@ -562,23 +561,11 @@ pub enum NextEventResponse {
Closed, Closed,
} }
#[repr(u32)] #[op]
enum NextEventKind {
String = 0,
Binary = 1,
Close = 2,
Ping = 3,
Pong = 4,
Error = 5,
Closed = 6,
}
#[op(deferred)]
pub async fn op_ws_next_event( pub async fn op_ws_next_event(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
rid: ResourceId, rid: ResourceId,
kind_out: &mut [u32], ) -> Result<NextEventResponse, AnyError> {
) -> Result<Option<StringOrBuffer>, AnyError> {
let resource = state let resource = state
.borrow_mut() .borrow_mut()
.resource_table .resource_table
@ -586,45 +573,28 @@ pub async fn op_ws_next_event(
let cancel = RcRef::map(&resource, |r| &r.cancel); let cancel = RcRef::map(&resource, |r| &r.cancel);
let val = resource.next_message(cancel).await?; let val = resource.next_message(cancel).await?;
let (kind, value) = match val { let res = match val {
Some(Ok(Message::Text(text))) => ( Some(Ok(Message::Text(text))) => NextEventResponse::String(text),
NextEventKind::String as u32, Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()),
Some(StringOrBuffer::String(text)), Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close {
), code: frame.code.into(),
Some(Ok(Message::Binary(data))) => ( reason: frame.reason.to_string(),
NextEventKind::Binary as u32, },
Some(StringOrBuffer::Buffer(data.into())), Some(Ok(Message::Close(None))) => NextEventResponse::Close {
), code: 1005,
Some(Ok(Message::Close(Some(frame)))) => { reason: String::new(),
let code: u16 = frame.code.into(); },
kind_out[1] = code as u32; Some(Ok(Message::Ping(_))) => NextEventResponse::Ping,
( Some(Ok(Message::Pong(_))) => NextEventResponse::Pong,
NextEventKind::Close as u32, Some(Err(e)) => NextEventResponse::Error(e.to_string()),
Some(StringOrBuffer::String(frame.reason.to_string())),
)
}
Some(Ok(Message::Close(None))) => {
kind_out[1] = 1005;
(
NextEventKind::Close as u32,
Some(StringOrBuffer::String(String::new())),
)
}
Some(Ok(Message::Ping(_))) => (NextEventKind::Ping as u32, None),
Some(Ok(Message::Pong(_))) => (NextEventKind::Pong as u32, None),
Some(Err(e)) => (
NextEventKind::Error as u32,
Some(StringOrBuffer::String(e.to_string())),
),
None => { None => {
// No message was received, presumably the socket closed while we waited. // No message was received, presumably the socket closed while we waited.
// Try close the stream, ignoring any errors, and report closed status to JavaScript. // Try close the stream, ignoring any errors, and report closed status to JavaScript.
let _ = state.borrow_mut().resource_table.close(rid); let _ = state.borrow_mut().resource_table.close(rid);
(NextEventKind::Closed as u32, None) NextEventResponse::Closed
} }
}; };
kind_out[0] = kind as u32; Ok(res)
Ok(value)
} }
pub fn init<P: WebSocketPermissions + 'static>( pub fn init<P: WebSocketPermissions + 'static>(