mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
fix(ext/websocket): correctly order messages when sending blobs (#24133)
Previously the asynchronous read of the blob would not block sends that are started later. We now do this, but in such a way as to not regress performance in the common case of not using `Blob`.
This commit is contained in:
parent
52c8428674
commit
81a05e837b
2 changed files with 84 additions and 16 deletions
|
@ -25,6 +25,8 @@ const {
|
|||
ArrayBufferIsView,
|
||||
ArrayPrototypeJoin,
|
||||
ArrayPrototypeMap,
|
||||
ArrayPrototypePush,
|
||||
ArrayPrototypeShift,
|
||||
ArrayPrototypeSome,
|
||||
ErrorPrototypeToString,
|
||||
ObjectDefineProperties,
|
||||
|
@ -41,7 +43,6 @@ const {
|
|||
SymbolFor,
|
||||
SymbolIterator,
|
||||
TypedArrayPrototypeGetByteLength,
|
||||
Uint8Array,
|
||||
} = primordials;
|
||||
|
||||
import { URL } from "ext:deno_url/00_url.js";
|
||||
|
@ -111,11 +112,14 @@ const _extensions = Symbol("[[extensions]]");
|
|||
const _protocol = Symbol("[[protocol]]");
|
||||
const _binaryType = Symbol("[[binaryType]]");
|
||||
const _eventLoop = Symbol("[[eventLoop]]");
|
||||
const _sendQueue = Symbol("[[sendQueue]]");
|
||||
const _queueSend = Symbol("[[queueSend]]");
|
||||
|
||||
const _server = Symbol("[[server]]");
|
||||
const _idleTimeoutDuration = Symbol("[[idleTimeout]]");
|
||||
const _idleTimeoutTimeout = Symbol("[[idleTimeoutTimeout]]");
|
||||
const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]");
|
||||
|
||||
class WebSocket extends EventTarget {
|
||||
constructor(url, protocols = []) {
|
||||
super();
|
||||
|
@ -129,6 +133,8 @@ class WebSocket extends EventTarget {
|
|||
this[_binaryType] = "blob";
|
||||
this[_idleTimeoutDuration] = 0;
|
||||
this[_idleTimeoutTimeout] = undefined;
|
||||
this[_sendQueue] = [];
|
||||
|
||||
const prefix = "Failed to construct 'WebSocket'";
|
||||
webidl.requiredArguments(arguments.length, 1, prefix);
|
||||
url = webidl.converters.USVString(url, prefix, "Argument 1");
|
||||
|
@ -326,22 +332,26 @@ class WebSocket extends EventTarget {
|
|||
throw new DOMException("readyState not OPEN", "InvalidStateError");
|
||||
}
|
||||
|
||||
if (ArrayBufferIsView(data)) {
|
||||
op_ws_send_binary(this[_rid], data);
|
||||
} else if (isArrayBuffer(data)) {
|
||||
op_ws_send_binary(this[_rid], new Uint8Array(data));
|
||||
} else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
|
||||
PromisePrototypeThen(
|
||||
// deno-lint-ignore prefer-primordials
|
||||
data.slice().arrayBuffer(),
|
||||
(ab) => op_ws_send_binary_ab(this[_rid], ab),
|
||||
);
|
||||
if (this[_sendQueue].length === 0) {
|
||||
// Fast path if the send queue is empty, for example when only synchronous
|
||||
// data is being sent.
|
||||
if (ArrayBufferIsView(data)) {
|
||||
op_ws_send_binary(this[_rid], data);
|
||||
} else if (isArrayBuffer(data)) {
|
||||
op_ws_send_binary_ab(this[_rid], data);
|
||||
} else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
|
||||
this[_queueSend](data);
|
||||
} else {
|
||||
const string = String(data);
|
||||
op_ws_send_text(
|
||||
this[_rid],
|
||||
string,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
const string = String(data);
|
||||
op_ws_send_text(
|
||||
this[_rid],
|
||||
string,
|
||||
);
|
||||
// Slower path if the send queue is not empty, for example when sending
|
||||
// asynchronous data like a Blob.
|
||||
this[_queueSend](data);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -508,6 +518,38 @@ class WebSocket extends EventTarget {
|
|||
}
|
||||
}
|
||||
|
||||
async [_queueSend](data) {
|
||||
const queue = this[_sendQueue];
|
||||
|
||||
ArrayPrototypePush(queue, data);
|
||||
|
||||
if (queue.length > 1) {
|
||||
// There is already a send in progress, so we just push to the queue
|
||||
// and let that task handle sending of this data.
|
||||
return;
|
||||
}
|
||||
|
||||
while (queue.length > 0) {
|
||||
const data = queue[0];
|
||||
if (ArrayBufferIsView(data)) {
|
||||
op_ws_send_binary(this[_rid], data);
|
||||
} else if (isArrayBuffer(data)) {
|
||||
op_ws_send_binary_ab(this[_rid], data);
|
||||
} else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
|
||||
// deno-lint-ignore prefer-primordials
|
||||
const ab = await data.slice().arrayBuffer();
|
||||
op_ws_send_binary_ab(this[_rid], ab);
|
||||
} else {
|
||||
const string = String(data);
|
||||
op_ws_send_text(
|
||||
this[_rid],
|
||||
string,
|
||||
);
|
||||
}
|
||||
ArrayPrototypeShift(queue);
|
||||
}
|
||||
}
|
||||
|
||||
[_serverHandleIdleTimeout]() {
|
||||
if (this[_idleTimeoutDuration]) {
|
||||
clearTimeout(this[_idleTimeoutTimeout]);
|
||||
|
@ -608,6 +650,7 @@ function createWebSocketBranded() {
|
|||
socket[_binaryType] = "arraybuffer";
|
||||
socket[_idleTimeoutDuration] = 0;
|
||||
socket[_idleTimeoutTimeout] = undefined;
|
||||
socket[_sendQueue] = [];
|
||||
return socket;
|
||||
}
|
||||
|
||||
|
|
|
@ -706,6 +706,31 @@ Deno.test("echo arraybuffer with binaryType arraybuffer", async () => {
|
|||
await promise;
|
||||
});
|
||||
|
||||
Deno.test("echo blob mixed with string", async () => {
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
const ws = new WebSocket("ws://localhost:4242");
|
||||
ws.binaryType = "arraybuffer";
|
||||
const blob = new Blob(["foo"]);
|
||||
ws.onerror = () => fail();
|
||||
ws.onopen = () => {
|
||||
ws.send(blob);
|
||||
ws.send("bar");
|
||||
};
|
||||
const messages: (ArrayBuffer | string)[] = [];
|
||||
ws.onmessage = (e) => {
|
||||
messages.push(e.data);
|
||||
if (messages.length === 2) {
|
||||
assertEquals(messages[0], new Uint8Array([102, 111, 111]).buffer);
|
||||
assertEquals(messages[1], "bar");
|
||||
ws.close();
|
||||
}
|
||||
};
|
||||
ws.onclose = () => {
|
||||
resolve();
|
||||
};
|
||||
await promise;
|
||||
});
|
||||
|
||||
Deno.test("Event Handlers order", async () => {
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
const ws = new WebSocket("ws://localhost:4242");
|
||||
|
|
Loading…
Reference in a new issue