diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js index f7bd820c0c..e4774f8158 100644 --- a/ext/websocket/01_websocket.js +++ b/ext/websocket/01_websocket.js @@ -18,20 +18,20 @@ ArrayPrototypeJoin, ArrayPrototypeMap, ArrayPrototypeSome, - DataView, ErrorPrototypeToString, ObjectDefineProperties, ObjectPrototypeIsPrototypeOf, PromisePrototypeThen, RegExpPrototypeTest, Set, - String, StringPrototypeEndsWith, StringPrototypeToLowerCase, Symbol, SymbolIterator, PromisePrototypeCatch, + queueMicrotask, SymbolFor, + Uint8Array, } = window.__bootstrap.primordials; webidl.converters["sequence or DOMString"] = (V, opts) => { @@ -290,40 +290,58 @@ throw new DOMException("readyState not OPEN", "InvalidStateError"); } + if (typeof data === "string") { + // try to send in one go! + const d = core.byteLength(data); + const sent = ops.op_ws_try_send_string(this[_rid], data); + this[_bufferedAmount] += d; + if (!sent) { + PromisePrototypeThen( + core.opAsync("op_ws_send_string", this[_rid], data), + () => { + this[_bufferedAmount] -= d; + }, + ); + } else { + // Spec expects data to be start flushing on next tick but oh well... + // we already sent it so we can just decrement the bufferedAmount + // on the next tick. + queueMicrotask(() => { + this[_bufferedAmount] -= d; + }); + } + return; + } + const sendTypedArray = (ta) => { + // try to send in one go! + const sent = ops.op_ws_try_send_binary(this[_rid], ta); this[_bufferedAmount] += ta.byteLength; - PromisePrototypeThen( - core.opAsync("op_ws_send", this[_rid], { - kind: "binary", - value: ta, - }), - () => { + if (!sent) { + PromisePrototypeThen( + core.opAsync("op_ws_send_binary", this[_rid], ta), + () => { + this[_bufferedAmount] -= ta.byteLength; + }, + ); + } else { + // Spec expects data to be start flushing on next tick but oh well... + // we already sent it so we can just decrement the bufferedAmount + // on the next tick. + queueMicrotask(() => { this[_bufferedAmount] -= ta.byteLength; - }, - ); + }); + } }; - if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) { - PromisePrototypeThen( - data.slice().arrayBuffer(), - (ab) => sendTypedArray(new DataView(ab)), - ); + if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) { + sendTypedArray(new Uint8Array(data)); } else if (ArrayBufferIsView(data)) { sendTypedArray(data); - } else if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) { - sendTypedArray(new DataView(data)); - } else { - const string = String(data); - const d = core.encode(string); - this[_bufferedAmount] += d.byteLength; + } else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) { PromisePrototypeThen( - core.opAsync("op_ws_send", this[_rid], { - kind: "text", - value: string, - }), - () => { - this[_bufferedAmount] -= d.byteLength; - }, + data.slice().arrayBuffer(), + (ab) => sendTypedArray(new Uint8Array(ab)), ); } } diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index a314313774..704c699a76 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -161,6 +161,51 @@ impl WsStreamResource { } } + fn try_send(self: &Rc, message: Message) -> Result { + let waker = deno_core::futures::task::noop_waker(); + let mut cx = std::task::Context::from_waker(&waker); + + let res = match self.stream { + WebSocketStreamType::Client { .. } => { + match RcRef::map(self, |r| match &r.stream { + WebSocketStreamType::Client { tx, .. } => tx, + WebSocketStreamType::Server { .. } => unreachable!(), + }) + .try_borrow_mut() + { + Some(mut tx) => { + if tx.poll_ready_unpin(&mut cx).is_ready() { + tx.start_send_unpin(message)?; + tx.poll_flush_unpin(&mut cx).is_ready() + } else { + false + } + } + None => false, + } + } + WebSocketStreamType::Server { .. } => { + match RcRef::map(self, |r| match &r.stream { + WebSocketStreamType::Client { .. } => unreachable!(), + WebSocketStreamType::Server { tx, .. } => tx, + }) + .try_borrow_mut() + { + Some(mut tx) => { + if tx.poll_ready_unpin(&mut cx).is_ready() { + tx.start_send_unpin(message)?; + tx.poll_flush_unpin(&mut cx).is_ready() + } else { + false + } + } + None => false, + } + } + }; + Ok(res) + } + async fn next_message( self: &Rc, cancel: RcRef, @@ -426,6 +471,54 @@ pub async fn op_ws_send( Ok(()) } +#[op] +pub async fn op_ws_send_string( + state: Rc>, + rid: ResourceId, + text: String, +) -> Result<(), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .get::(rid)?; + resource.send(Message::Text(text)).await?; + Ok(()) +} + +#[op] +pub async fn op_ws_send_binary( + state: Rc>, + rid: ResourceId, + data: ZeroCopyBuf, +) -> Result<(), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .get::(rid)?; + resource.send(Message::Binary(data.to_vec())).await?; + Ok(()) +} + +#[op] +pub fn op_ws_try_send_string( + state: &mut OpState, + rid: ResourceId, + text: String, +) -> Result { + let resource = state.resource_table.get::(rid)?; + resource.try_send(Message::Text(text)) +} + +#[op(fast)] +pub fn op_ws_try_send_binary( + state: &mut OpState, + rid: u32, + value: &[u8], +) -> Result { + let resource = state.resource_table.get::(rid)?; + resource.try_send(Message::Binary(value.to_vec())) +} + #[op(deferred)] pub async fn op_ws_close( state: Rc>, @@ -515,6 +608,10 @@ pub fn init( op_ws_send::decl(), op_ws_close::decl(), op_ws_next_event::decl(), + op_ws_send_string::decl(), + op_ws_send_binary::decl(), + op_ws_try_send_string::decl(), + op_ws_try_send_binary::decl(), ]) .state(move |state| { state.put::(WsUserAgent(user_agent.clone()));