From 638b6ef554676422c43cc5c0ae2285ba369740bf Mon Sep 17 00:00:00 2001 From: Leo Kettmeir Date: Fri, 20 Jan 2023 17:20:14 +0100 Subject: [PATCH] Revert "perf(ext/websocket): optimize socket.send (#16320)" (#17480) This reverts commit 36307c45 --- ext/websocket/01_websocket.js | 76 ++++++++++--------------- ext/websocket/lib.rs | 103 ---------------------------------- 2 files changed, 29 insertions(+), 150 deletions(-) diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js index 0d0a4211a4..9b7c45e708 100644 --- a/ext/websocket/01_websocket.js +++ b/ext/websocket/01_websocket.js @@ -26,6 +26,7 @@ ArrayPrototypeJoin, ArrayPrototypeMap, ArrayPrototypeSome, + DataView, ErrorPrototypeToString, ObjectDefineProperties, ObjectPrototypeIsPrototypeOf, @@ -34,14 +35,13 @@ Set, // TODO(lucacasonato): add SharedArrayBuffer to primordials // SharedArrayBufferPrototype + String, StringPrototypeEndsWith, StringPrototypeToLowerCase, Symbol, SymbolIterator, PromisePrototypeCatch, - queueMicrotask, SymbolFor, - Uint8Array, } = window.__bootstrap.primordials; webidl.converters["sequence or DOMString"] = (V, opts) => { @@ -300,58 +300,40 @@ throw new DOMException("readyState not OPEN", "InvalidStateError"); } - if (typeof data === "string") { - // try to send in one go! - const d = core.byteLength(data); - const sent = ops.op_ws_try_send_string(this[_rid], data); - this[_bufferedAmount] += d; - if (!sent) { - PromisePrototypeThen( - core.opAsync("op_ws_send_string", this[_rid], data), - () => { - this[_bufferedAmount] -= d; - }, - ); - } else { - // Spec expects data to be start flushing on next tick but oh well... - // we already sent it so we can just decrement the bufferedAmount - // on the next tick. - queueMicrotask(() => { - this[_bufferedAmount] -= d; - }); - } - return; - } - const sendTypedArray = (ta) => { - // try to send in one go! - const sent = ops.op_ws_try_send_binary(this[_rid], ta); this[_bufferedAmount] += ta.byteLength; - if (!sent) { - PromisePrototypeThen( - core.opAsync("op_ws_send_binary", this[_rid], ta), - () => { - this[_bufferedAmount] -= ta.byteLength; - }, - ); - } else { - // Spec expects data to be start flushing on next tick but oh well... - // we already sent it so we can just decrement the bufferedAmount - // on the next tick. - queueMicrotask(() => { + PromisePrototypeThen( + core.opAsync("op_ws_send", this[_rid], { + kind: "binary", + value: ta, + }), + () => { this[_bufferedAmount] -= ta.byteLength; - }); - } + }, + ); }; - if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) { - sendTypedArray(new Uint8Array(data)); - } else if (ArrayBufferIsView(data)) { - sendTypedArray(data); - } else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) { + if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) { PromisePrototypeThen( data.slice().arrayBuffer(), - (ab) => sendTypedArray(new Uint8Array(ab)), + (ab) => sendTypedArray(new DataView(ab)), + ); + } else if (ArrayBufferIsView(data)) { + sendTypedArray(data); + } else if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) { + sendTypedArray(new DataView(data)); + } else { + const string = String(data); + const d = core.encode(string); + this[_bufferedAmount] += d.byteLength; + PromisePrototypeThen( + core.opAsync("op_ws_send", this[_rid], { + kind: "text", + value: string, + }), + () => { + this[_bufferedAmount] -= d.byteLength; + }, ); } } diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index bc4b3876d7..8123f84fc5 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -161,51 +161,6 @@ impl WsStreamResource { } } - fn try_send(self: &Rc, message: Message) -> Result { - let waker = deno_core::futures::task::noop_waker(); - let mut cx = std::task::Context::from_waker(&waker); - - let res = match self.stream { - WebSocketStreamType::Client { .. } => { - match RcRef::map(self, |r| match &r.stream { - WebSocketStreamType::Client { tx, .. } => tx, - WebSocketStreamType::Server { .. } => unreachable!(), - }) - .try_borrow_mut() - { - Some(mut tx) => { - if tx.poll_ready_unpin(&mut cx).is_ready() { - tx.start_send_unpin(message)?; - tx.poll_flush_unpin(&mut cx).is_ready() - } else { - false - } - } - None => false, - } - } - WebSocketStreamType::Server { .. } => { - match RcRef::map(self, |r| match &r.stream { - WebSocketStreamType::Client { .. } => unreachable!(), - WebSocketStreamType::Server { tx, .. } => tx, - }) - .try_borrow_mut() - { - Some(mut tx) => { - if tx.poll_ready_unpin(&mut cx).is_ready() { - tx.start_send_unpin(message)?; - tx.poll_flush_unpin(&mut cx).is_ready() - } else { - false - } - } - None => false, - } - } - }; - Ok(res) - } - async fn next_message( self: &Rc, cancel: RcRef, @@ -471,60 +426,6 @@ pub async fn op_ws_send( Ok(()) } -#[op] -pub async fn op_ws_send_string( - state: Rc>, - rid: ResourceId, - text: String, -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - resource.send(Message::Text(text)).await?; - Ok(()) -} - -#[op] -pub async fn op_ws_send_binary( - state: Rc>, - rid: ResourceId, - data: ZeroCopyBuf, -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - resource.send(Message::Binary(data.to_vec())).await?; - Ok(()) -} - -#[op] -pub fn op_ws_try_send_string( - state: &mut OpState, - rid: ResourceId, - text: String, -) -> bool { - let resource = match state.resource_table.get::(rid) { - Ok(resource) => resource, - Err(_) => return false, - }; - resource.try_send(Message::Text(text)).is_ok() -} - -#[op(fast)] -pub fn op_ws_try_send_binary( - state: &mut OpState, - rid: u32, - value: &[u8], -) -> bool { - let resource = match state.resource_table.get::(rid) { - Ok(resource) => resource, - Err(_) => return false, - }; - resource.try_send(Message::Binary(value.to_vec())).is_ok() -} - #[op(deferred)] pub async fn op_ws_close( state: Rc>, @@ -615,10 +516,6 @@ pub fn init( op_ws_send::decl(), op_ws_close::decl(), op_ws_next_event::decl(), - op_ws_send_string::decl(), - op_ws_send_binary::decl(), - op_ws_try_send_string::decl(), - op_ws_try_send_binary::decl(), ]) .state(move |state| { state.put::(WsUserAgent(user_agent.clone()));