mirror of
https://github.com/denoland/deno.git
synced 2025-01-18 11:53:59 -05:00
dd8cbf5e29
Fixes #22995. Fixes #23000. There were a handful of bugs here causing the hang (each with a corresponding minimized test): - We were canceling recv futures when `receiveMessageOnPort` was called, but this caused the "receive loop" in the message port to exit. This was due to the fact that `CancelHandle`s are never reset (i.e., once you `cancel` a `CancelHandle`, it remains cancelled). That meant that after `receieveMessageOnPort` was called, the subsequent calls to `op_message_port_recv_message` would throw `Interrupted` exceptions, and we would exit the loop. The cancellation, however, isn't actually necessary. `op_message_port_recv_message` only borrows the underlying port for long enough to poll the receiver, so the borrow there could never overlap with `op_message_port_recv_message_sync`. - Calling `MessagePort.unref()` caused the "receive loop" in the message port to exit. This was because we were setting `messageEventListenerCount` to 0 on unref. Not only does that break the counter when multiple `MessagePort`s are present in the same thread, but we also exited the "receive loop" whenever the listener count was 0. I assume this was to prevent the recv promise from keeping the event loop open. Instead of this, I chose to just unref the recv promise as needed to control the event loop. - The last bug causing the hang (which was a doozy to debug) ended up being an unfortunate interaction between how we implement our messageport "receive loop" and a pattern found in `npm:piscina` (which angular uses). The gist of it is that piscina uses an atomic wait loop along with `receiveMessageOnPort` in its worker threads, and as the worker is getting started, the following incredibly convoluted series of events occurs: 1. Parent sends a MessagePort `p` to worker 2. Parent sends a message `m` to the port `p` 3. Parent notifies the worker with `Atomics.notify` that a new message is available 4. Worker receives message, adds "message" listener to port `p` 5. Adding the listener triggers `MessagePort.start()` on `p` 6. Receive loop in MessagePort.start receives the message `m`, but then hits an await point and yields (before dispatching the "message" event) 7. Worker continues execution, starts the atomic wait loop, and immediately receives the existing notification from the parent that a message is available 8. Worker attempts to receive the new message `m` with `receiveMessageOnPort`, but this returns `undefined` because the receive loop already took the message in 6 9. Atomic wait loop continues to next iteration, waiting for the next message with `Atomic.wait` 10. `Atomic.wait` blocks the worker thread, which prevents the receive loop from continuing and dispatching the "message" event for the received message 11. The parent waits for the worker to respond to the first message, and waits 12. The thread can't make any more progress, and the whole process hangs The fix I've chosen here (which I don't particularly love, but it works) is to just delay the `MessagePort.start` call until the end of the event loop turn, so that the atomic wait loop receives the message first. This prevents the hang. --- Those were the main issues causing the hang. There ended up being a few other small bugs as well, namely `exit` being emitted multiple times, and not patching up the message port when it's received by `receiveMessageOnPort`.
514 lines
14 KiB
JavaScript
514 lines
14 KiB
JavaScript
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
|
|
|
// @ts-check
|
|
/// <reference path="../../core/lib.deno_core.d.ts" />
|
|
/// <reference path="../webidl/internal.d.ts" />
|
|
/// <reference path="./internal.d.ts" />
|
|
/// <reference path="./lib.deno_web.d.ts" />
|
|
|
|
import { core, primordials } from "ext:core/mod.js";
|
|
import {
|
|
op_message_port_create_entangled,
|
|
op_message_port_post_message,
|
|
op_message_port_recv_message,
|
|
} from "ext:core/ops";
|
|
const {
|
|
ArrayBufferPrototypeGetByteLength,
|
|
ArrayPrototypeFilter,
|
|
ArrayPrototypeIncludes,
|
|
ArrayPrototypePush,
|
|
ObjectPrototypeIsPrototypeOf,
|
|
ObjectDefineProperty,
|
|
Symbol,
|
|
SymbolFor,
|
|
SymbolIterator,
|
|
PromiseResolve,
|
|
SafeArrayIterator,
|
|
TypeError,
|
|
} = primordials;
|
|
const {
|
|
InterruptedPrototype,
|
|
isArrayBuffer,
|
|
} = core;
|
|
import * as webidl from "ext:deno_webidl/00_webidl.js";
|
|
import { createFilteredInspectProxy } from "ext:deno_console/01_console.js";
|
|
import {
|
|
defineEventHandler,
|
|
EventTarget,
|
|
MessageEvent,
|
|
setEventTargetData,
|
|
setIsTrusted,
|
|
} from "./02_event.js";
|
|
import { isDetachedBuffer } from "./06_streams.js";
|
|
import { DOMException } from "./01_dom_exception.js";
|
|
|
|
// counter of how many message ports are actively refed
|
|
// either due to the existence of "message" event listeners or
|
|
// explicit calls to ref/unref (in the case of node message ports)
|
|
let refedMessagePortsCount = 0;
|
|
|
|
class MessageChannel {
|
|
/** @type {MessagePort} */
|
|
#port1;
|
|
/** @type {MessagePort} */
|
|
#port2;
|
|
|
|
constructor() {
|
|
this[webidl.brand] = webidl.brand;
|
|
const { 0: port1Id, 1: port2Id } = opCreateEntangledMessagePort();
|
|
const port1 = createMessagePort(port1Id);
|
|
const port2 = createMessagePort(port2Id);
|
|
this.#port1 = port1;
|
|
this.#port2 = port2;
|
|
}
|
|
|
|
get port1() {
|
|
webidl.assertBranded(this, MessageChannelPrototype);
|
|
return this.#port1;
|
|
}
|
|
|
|
get port2() {
|
|
webidl.assertBranded(this, MessageChannelPrototype);
|
|
return this.#port2;
|
|
}
|
|
|
|
[SymbolFor("Deno.privateCustomInspect")](inspect, inspectOptions) {
|
|
return inspect(
|
|
createFilteredInspectProxy({
|
|
object: this,
|
|
evaluate: ObjectPrototypeIsPrototypeOf(MessageChannelPrototype, this),
|
|
keys: [
|
|
"port1",
|
|
"port2",
|
|
],
|
|
}),
|
|
inspectOptions,
|
|
);
|
|
}
|
|
}
|
|
|
|
webidl.configureInterface(MessageChannel);
|
|
const MessageChannelPrototype = MessageChannel.prototype;
|
|
|
|
const _id = Symbol("id");
|
|
const MessagePortIdSymbol = _id;
|
|
const MessagePortReceiveMessageOnPortSymbol = Symbol(
|
|
"MessagePortReceiveMessageOnPort",
|
|
);
|
|
const _enabled = Symbol("enabled");
|
|
const _refed = Symbol("refed");
|
|
const _messageEventListenerCount = Symbol("messageEventListenerCount");
|
|
const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
|
|
const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
|
|
export const refMessagePort = Symbol("refMessagePort");
|
|
/** It is used by 99_main.js and worker_threads to
|
|
* unref/ref on the global pollForMessages promise. */
|
|
export const unrefPollForMessages = Symbol("unrefPollForMessages");
|
|
|
|
/**
|
|
* @param {number} id
|
|
* @returns {MessagePort}
|
|
*/
|
|
function createMessagePort(id) {
|
|
const port = webidl.createBranded(MessagePort);
|
|
port[core.hostObjectBrand] = core.hostObjectBrand;
|
|
setEventTargetData(port);
|
|
port[_id] = id;
|
|
port[_enabled] = false;
|
|
port[_messageEventListenerCount] = 0;
|
|
port[_refed] = false;
|
|
return port;
|
|
}
|
|
|
|
function nodeWorkerThreadMaybeInvokeCloseCb(port) {
|
|
if (
|
|
typeof port[nodeWorkerThreadCloseCb] == "function" &&
|
|
!port[nodeWorkerThreadCloseCbInvoked]
|
|
) {
|
|
port[nodeWorkerThreadCloseCb]();
|
|
port[nodeWorkerThreadCloseCbInvoked] = true;
|
|
}
|
|
}
|
|
|
|
const _isRefed = Symbol("isRefed");
|
|
const _dataPromise = Symbol("dataPromise");
|
|
|
|
class MessagePort extends EventTarget {
|
|
/** @type {number | null} */
|
|
[_id] = null;
|
|
/** @type {boolean} */
|
|
[_enabled] = false;
|
|
[_refed] = false;
|
|
/** @type {Promise<any> | undefined} */
|
|
[_dataPromise] = undefined;
|
|
[_messageEventListenerCount] = 0;
|
|
|
|
constructor() {
|
|
super();
|
|
ObjectDefineProperty(this, MessagePortReceiveMessageOnPortSymbol, {
|
|
__proto__: null,
|
|
value: false,
|
|
enumerable: false,
|
|
});
|
|
ObjectDefineProperty(this, nodeWorkerThreadCloseCb, {
|
|
__proto__: null,
|
|
value: null,
|
|
enumerable: false,
|
|
});
|
|
ObjectDefineProperty(this, nodeWorkerThreadCloseCbInvoked, {
|
|
__proto__: null,
|
|
value: false,
|
|
enumerable: false,
|
|
});
|
|
webidl.illegalConstructor();
|
|
}
|
|
|
|
/**
|
|
* @param {any} message
|
|
* @param {object[] | StructuredSerializeOptions} transferOrOptions
|
|
*/
|
|
postMessage(message, transferOrOptions = { __proto__: null }) {
|
|
webidl.assertBranded(this, MessagePortPrototype);
|
|
const prefix = "Failed to execute 'postMessage' on 'MessagePort'";
|
|
webidl.requiredArguments(arguments.length, 1, prefix);
|
|
message = webidl.converters.any(message);
|
|
let options;
|
|
if (
|
|
webidl.type(transferOrOptions) === "Object" &&
|
|
transferOrOptions !== undefined &&
|
|
transferOrOptions[SymbolIterator] !== undefined
|
|
) {
|
|
const transfer = webidl.converters["sequence<object>"](
|
|
transferOrOptions,
|
|
prefix,
|
|
"Argument 2",
|
|
);
|
|
options = { transfer };
|
|
} else {
|
|
options = webidl.converters.StructuredSerializeOptions(
|
|
transferOrOptions,
|
|
prefix,
|
|
"Argument 2",
|
|
);
|
|
}
|
|
const { transfer } = options;
|
|
if (ArrayPrototypeIncludes(transfer, this)) {
|
|
throw new DOMException("Can not transfer self", "DataCloneError");
|
|
}
|
|
const data = serializeJsMessageData(message, transfer);
|
|
if (this[_id] === null) return;
|
|
op_message_port_post_message(this[_id], data);
|
|
}
|
|
|
|
start() {
|
|
webidl.assertBranded(this, MessagePortPrototype);
|
|
if (this[_enabled]) return;
|
|
(async () => {
|
|
this[_enabled] = true;
|
|
while (true) {
|
|
if (this[_id] === null) break;
|
|
let data;
|
|
try {
|
|
this[_dataPromise] = op_message_port_recv_message(
|
|
this[_id],
|
|
);
|
|
if (
|
|
typeof this[nodeWorkerThreadCloseCb] === "function" &&
|
|
!this[_refed]
|
|
) {
|
|
core.unrefOpPromise(this[_dataPromise]);
|
|
}
|
|
data = await this[_dataPromise];
|
|
this[_dataPromise] = undefined;
|
|
} catch (err) {
|
|
if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) {
|
|
break;
|
|
}
|
|
nodeWorkerThreadMaybeInvokeCloseCb(this);
|
|
throw err;
|
|
}
|
|
if (data === null) {
|
|
nodeWorkerThreadMaybeInvokeCloseCb(this);
|
|
break;
|
|
}
|
|
let message, transferables;
|
|
try {
|
|
const v = deserializeJsMessageData(data);
|
|
message = v[0];
|
|
transferables = v[1];
|
|
} catch (err) {
|
|
const event = new MessageEvent("messageerror", { data: err });
|
|
setIsTrusted(event, true);
|
|
this.dispatchEvent(event);
|
|
return;
|
|
}
|
|
const event = new MessageEvent("message", {
|
|
data: message,
|
|
ports: ArrayPrototypeFilter(
|
|
transferables,
|
|
(t) => ObjectPrototypeIsPrototypeOf(MessagePortPrototype, t),
|
|
),
|
|
});
|
|
setIsTrusted(event, true);
|
|
this.dispatchEvent(event);
|
|
}
|
|
this[_enabled] = false;
|
|
})();
|
|
}
|
|
|
|
[refMessagePort](ref) {
|
|
if (ref) {
|
|
if (!this[_refed]) {
|
|
refedMessagePortsCount++;
|
|
if (
|
|
this[_dataPromise]
|
|
) {
|
|
core.refOpPromise(this[_dataPromise]);
|
|
}
|
|
this[_refed] = true;
|
|
}
|
|
} else if (!ref) {
|
|
if (this[_refed]) {
|
|
refedMessagePortsCount--;
|
|
if (
|
|
this[_dataPromise]
|
|
) {
|
|
core.unrefOpPromise(this[_dataPromise]);
|
|
}
|
|
this[_refed] = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
close() {
|
|
webidl.assertBranded(this, MessagePortPrototype);
|
|
if (this[_id] !== null) {
|
|
core.close(this[_id]);
|
|
this[_id] = null;
|
|
nodeWorkerThreadMaybeInvokeCloseCb(this);
|
|
}
|
|
}
|
|
|
|
removeEventListener(...args) {
|
|
if (args[0] == "message") {
|
|
if (--this[_messageEventListenerCount] === 0 && this[_refed]) {
|
|
refedMessagePortsCount--;
|
|
this[_refed] = false;
|
|
}
|
|
}
|
|
super.removeEventListener(...new SafeArrayIterator(args));
|
|
}
|
|
|
|
addEventListener(...args) {
|
|
if (args[0] == "message") {
|
|
if (++this[_messageEventListenerCount] === 1 && !this[_refed]) {
|
|
refedMessagePortsCount++;
|
|
this[_refed] = true;
|
|
}
|
|
}
|
|
super.addEventListener(...new SafeArrayIterator(args));
|
|
}
|
|
|
|
[SymbolFor("Deno.privateCustomInspect")](inspect, inspectOptions) {
|
|
return inspect(
|
|
createFilteredInspectProxy({
|
|
object: this,
|
|
evaluate: ObjectPrototypeIsPrototypeOf(MessagePortPrototype, this),
|
|
keys: [
|
|
"onmessage",
|
|
"onmessageerror",
|
|
],
|
|
}),
|
|
inspectOptions,
|
|
);
|
|
}
|
|
}
|
|
|
|
defineEventHandler(MessagePort.prototype, "message", function (self) {
|
|
if (self[nodeWorkerThreadCloseCb]) {
|
|
(async () => {
|
|
// delay `start()` until he end of this event loop turn, to give `receiveMessageOnPort`
|
|
// a chance to receive a message first. this is primarily to resolve an issue with
|
|
// a pattern used in `npm:piscina` that results in an indefinite hang
|
|
await PromiseResolve();
|
|
self.start();
|
|
})();
|
|
} else {
|
|
self.start();
|
|
}
|
|
});
|
|
defineEventHandler(MessagePort.prototype, "messageerror");
|
|
|
|
webidl.configureInterface(MessagePort);
|
|
const MessagePortPrototype = MessagePort.prototype;
|
|
|
|
/**
|
|
* @returns {[number, number]}
|
|
*/
|
|
function opCreateEntangledMessagePort() {
|
|
return op_message_port_create_entangled();
|
|
}
|
|
|
|
/**
|
|
* @param {messagePort.MessageData} messageData
|
|
* @returns {[any, object[]]}
|
|
*/
|
|
function deserializeJsMessageData(messageData) {
|
|
/** @type {object[]} */
|
|
const transferables = [];
|
|
const arrayBufferIdsInTransferables = [];
|
|
const transferredArrayBuffers = [];
|
|
let options;
|
|
|
|
if (messageData.transferables.length > 0) {
|
|
const hostObjects = [];
|
|
for (let i = 0; i < messageData.transferables.length; ++i) {
|
|
const transferable = messageData.transferables[i];
|
|
switch (transferable.kind) {
|
|
case "messagePort": {
|
|
const port = createMessagePort(transferable.data);
|
|
ArrayPrototypePush(transferables, port);
|
|
ArrayPrototypePush(hostObjects, port);
|
|
break;
|
|
}
|
|
case "arrayBuffer": {
|
|
ArrayPrototypePush(transferredArrayBuffers, transferable.data);
|
|
const index = ArrayPrototypePush(transferables, null);
|
|
ArrayPrototypePush(arrayBufferIdsInTransferables, index);
|
|
break;
|
|
}
|
|
default:
|
|
throw new TypeError("Unreachable");
|
|
}
|
|
}
|
|
|
|
options = {
|
|
hostObjects,
|
|
transferredArrayBuffers,
|
|
};
|
|
}
|
|
|
|
const data = core.deserialize(messageData.data, options);
|
|
|
|
for (let i = 0; i < arrayBufferIdsInTransferables.length; ++i) {
|
|
const id = arrayBufferIdsInTransferables[i];
|
|
transferables[id] = transferredArrayBuffers[i];
|
|
}
|
|
|
|
return [data, transferables];
|
|
}
|
|
|
|
/**
|
|
* @param {any} data
|
|
* @param {object[]} transferables
|
|
* @returns {messagePort.MessageData}
|
|
*/
|
|
function serializeJsMessageData(data, transferables) {
|
|
let options;
|
|
const transferredArrayBuffers = [];
|
|
if (transferables.length > 0) {
|
|
const hostObjects = [];
|
|
for (let i = 0, j = 0; i < transferables.length; i++) {
|
|
const t = transferables[i];
|
|
if (isArrayBuffer(t)) {
|
|
if (
|
|
ArrayBufferPrototypeGetByteLength(t) === 0 &&
|
|
isDetachedBuffer(t)
|
|
) {
|
|
throw new DOMException(
|
|
`ArrayBuffer at index ${j} is already detached`,
|
|
"DataCloneError",
|
|
);
|
|
}
|
|
j++;
|
|
ArrayPrototypePush(transferredArrayBuffers, t);
|
|
} else if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, t)) {
|
|
ArrayPrototypePush(hostObjects, t);
|
|
}
|
|
}
|
|
|
|
options = {
|
|
hostObjects,
|
|
transferredArrayBuffers,
|
|
};
|
|
}
|
|
|
|
const serializedData = core.serialize(data, options, (err) => {
|
|
throw new DOMException(err, "DataCloneError");
|
|
});
|
|
|
|
/** @type {messagePort.Transferable[]} */
|
|
const serializedTransferables = [];
|
|
|
|
let arrayBufferI = 0;
|
|
for (let i = 0; i < transferables.length; ++i) {
|
|
const transferable = transferables[i];
|
|
if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, transferable)) {
|
|
webidl.assertBranded(transferable, MessagePortPrototype);
|
|
const id = transferable[_id];
|
|
if (id === null) {
|
|
throw new DOMException(
|
|
"Can not transfer disentangled message port",
|
|
"DataCloneError",
|
|
);
|
|
}
|
|
transferable[_id] = null;
|
|
ArrayPrototypePush(serializedTransferables, {
|
|
kind: "messagePort",
|
|
data: id,
|
|
});
|
|
} else if (isArrayBuffer(transferable)) {
|
|
ArrayPrototypePush(serializedTransferables, {
|
|
kind: "arrayBuffer",
|
|
data: transferredArrayBuffers[arrayBufferI],
|
|
});
|
|
arrayBufferI++;
|
|
} else {
|
|
throw new DOMException("Value not transferable", "DataCloneError");
|
|
}
|
|
}
|
|
|
|
return {
|
|
data: serializedData,
|
|
transferables: serializedTransferables,
|
|
};
|
|
}
|
|
|
|
webidl.converters.StructuredSerializeOptions = webidl
|
|
.createDictionaryConverter(
|
|
"StructuredSerializeOptions",
|
|
[
|
|
{
|
|
key: "transfer",
|
|
converter: webidl.converters["sequence<object>"],
|
|
get defaultValue() {
|
|
return [];
|
|
},
|
|
},
|
|
],
|
|
);
|
|
|
|
function structuredClone(value, options) {
|
|
const prefix = "Failed to execute 'structuredClone'";
|
|
webidl.requiredArguments(arguments.length, 1, prefix);
|
|
options = webidl.converters.StructuredSerializeOptions(
|
|
options,
|
|
prefix,
|
|
"Argument 2",
|
|
);
|
|
const messageData = serializeJsMessageData(value, options.transfer);
|
|
return deserializeJsMessageData(messageData)[0];
|
|
}
|
|
|
|
export {
|
|
deserializeJsMessageData,
|
|
MessageChannel,
|
|
MessagePort,
|
|
MessagePortIdSymbol,
|
|
MessagePortPrototype,
|
|
MessagePortReceiveMessageOnPortSymbol,
|
|
nodeWorkerThreadCloseCb,
|
|
refedMessagePortsCount,
|
|
serializeJsMessageData,
|
|
structuredClone,
|
|
};
|