1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-12 00:54:02 -05:00

perf(ext/websocket): optimize socket.send (#16320)

Towards #16315
This commit is contained in:
Divy Srivastava 2022-10-19 16:23:13 +05:30 committed by GitHub
parent 57f17bd3e6
commit 36307c45e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 143 additions and 28 deletions

View file

@ -18,20 +18,20 @@
ArrayPrototypeJoin, ArrayPrototypeJoin,
ArrayPrototypeMap, ArrayPrototypeMap,
ArrayPrototypeSome, ArrayPrototypeSome,
DataView,
ErrorPrototypeToString, ErrorPrototypeToString,
ObjectDefineProperties, ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf, ObjectPrototypeIsPrototypeOf,
PromisePrototypeThen, PromisePrototypeThen,
RegExpPrototypeTest, RegExpPrototypeTest,
Set, Set,
String,
StringPrototypeEndsWith, StringPrototypeEndsWith,
StringPrototypeToLowerCase, StringPrototypeToLowerCase,
Symbol, Symbol,
SymbolIterator, SymbolIterator,
PromisePrototypeCatch, PromisePrototypeCatch,
queueMicrotask,
SymbolFor, SymbolFor,
Uint8Array,
} = window.__bootstrap.primordials; } = window.__bootstrap.primordials;
webidl.converters["sequence<DOMString> or DOMString"] = (V, opts) => { webidl.converters["sequence<DOMString> or DOMString"] = (V, opts) => {
@ -290,40 +290,58 @@
throw new DOMException("readyState not OPEN", "InvalidStateError"); throw new DOMException("readyState not OPEN", "InvalidStateError");
} }
const sendTypedArray = (ta) => { if (typeof data === "string") {
this[_bufferedAmount] += ta.byteLength; // try to send in one go!
const d = core.byteLength(data);
const sent = ops.op_ws_try_send_string(this[_rid], data);
this[_bufferedAmount] += d;
if (!sent) {
PromisePrototypeThen( PromisePrototypeThen(
core.opAsync("op_ws_send", this[_rid], { core.opAsync("op_ws_send_string", this[_rid], data),
kind: "binary", () => {
value: ta, this[_bufferedAmount] -= d;
}), },
);
} else {
// Spec expects data to be start flushing on next tick but oh well...
// we already sent it so we can just decrement the bufferedAmount
// on the next tick.
queueMicrotask(() => {
this[_bufferedAmount] -= d;
});
}
return;
}
const sendTypedArray = (ta) => {
// try to send in one go!
const sent = ops.op_ws_try_send_binary(this[_rid], ta);
this[_bufferedAmount] += ta.byteLength;
if (!sent) {
PromisePrototypeThen(
core.opAsync("op_ws_send_binary", this[_rid], ta),
() => { () => {
this[_bufferedAmount] -= ta.byteLength; this[_bufferedAmount] -= ta.byteLength;
}, },
); );
} else {
// Spec expects data to be start flushing on next tick but oh well...
// we already sent it so we can just decrement the bufferedAmount
// on the next tick.
queueMicrotask(() => {
this[_bufferedAmount] -= ta.byteLength;
});
}
}; };
if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) { if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) {
PromisePrototypeThen( sendTypedArray(new Uint8Array(data));
data.slice().arrayBuffer(),
(ab) => sendTypedArray(new DataView(ab)),
);
} else if (ArrayBufferIsView(data)) { } else if (ArrayBufferIsView(data)) {
sendTypedArray(data); sendTypedArray(data);
} else if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) { } else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
sendTypedArray(new DataView(data));
} else {
const string = String(data);
const d = core.encode(string);
this[_bufferedAmount] += d.byteLength;
PromisePrototypeThen( PromisePrototypeThen(
core.opAsync("op_ws_send", this[_rid], { data.slice().arrayBuffer(),
kind: "text", (ab) => sendTypedArray(new Uint8Array(ab)),
value: string,
}),
() => {
this[_bufferedAmount] -= d.byteLength;
},
); );
} }
} }

View file

@ -161,6 +161,51 @@ impl WsStreamResource {
} }
} }
fn try_send(self: &Rc<Self>, message: Message) -> Result<bool, AnyError> {
let waker = deno_core::futures::task::noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
let res = match self.stream {
WebSocketStreamType::Client { .. } => {
match RcRef::map(self, |r| match &r.stream {
WebSocketStreamType::Client { tx, .. } => tx,
WebSocketStreamType::Server { .. } => unreachable!(),
})
.try_borrow_mut()
{
Some(mut tx) => {
if tx.poll_ready_unpin(&mut cx).is_ready() {
tx.start_send_unpin(message)?;
tx.poll_flush_unpin(&mut cx).is_ready()
} else {
false
}
}
None => false,
}
}
WebSocketStreamType::Server { .. } => {
match RcRef::map(self, |r| match &r.stream {
WebSocketStreamType::Client { .. } => unreachable!(),
WebSocketStreamType::Server { tx, .. } => tx,
})
.try_borrow_mut()
{
Some(mut tx) => {
if tx.poll_ready_unpin(&mut cx).is_ready() {
tx.start_send_unpin(message)?;
tx.poll_flush_unpin(&mut cx).is_ready()
} else {
false
}
}
None => false,
}
}
};
Ok(res)
}
async fn next_message( async fn next_message(
self: &Rc<Self>, self: &Rc<Self>,
cancel: RcRef<CancelHandle>, cancel: RcRef<CancelHandle>,
@ -426,6 +471,54 @@ pub async fn op_ws_send(
Ok(()) Ok(())
} }
#[op]
pub async fn op_ws_send_string(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
text: String,
) -> Result<(), AnyError> {
let resource = state
.borrow_mut()
.resource_table
.get::<WsStreamResource>(rid)?;
resource.send(Message::Text(text)).await?;
Ok(())
}
#[op]
pub async fn op_ws_send_binary(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
data: ZeroCopyBuf,
) -> Result<(), AnyError> {
let resource = state
.borrow_mut()
.resource_table
.get::<WsStreamResource>(rid)?;
resource.send(Message::Binary(data.to_vec())).await?;
Ok(())
}
#[op]
pub fn op_ws_try_send_string(
state: &mut OpState,
rid: ResourceId,
text: String,
) -> Result<bool, AnyError> {
let resource = state.resource_table.get::<WsStreamResource>(rid)?;
resource.try_send(Message::Text(text))
}
#[op(fast)]
pub fn op_ws_try_send_binary(
state: &mut OpState,
rid: u32,
value: &[u8],
) -> Result<bool, AnyError> {
let resource = state.resource_table.get::<WsStreamResource>(rid)?;
resource.try_send(Message::Binary(value.to_vec()))
}
#[op(deferred)] #[op(deferred)]
pub async fn op_ws_close( pub async fn op_ws_close(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
@ -515,6 +608,10 @@ pub fn init<P: WebSocketPermissions + 'static>(
op_ws_send::decl(), op_ws_send::decl(),
op_ws_close::decl(), op_ws_close::decl(),
op_ws_next_event::decl(), op_ws_next_event::decl(),
op_ws_send_string::decl(),
op_ws_send_binary::decl(),
op_ws_try_send_string::decl(),
op_ws_try_send_binary::decl(),
]) ])
.state(move |state| { .state(move |state| {
state.put::<WsUserAgent>(WsUserAgent(user_agent.clone())); state.put::<WsUserAgent>(WsUserAgent(user_agent.clone()));