1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-24 15:19:26 -05:00

fix(ext/node) implement receiveMessageOnPort for node:worker_threads (#22766)

Implementation of `receiveMessageOnPort` for `node:worker_threads`

Fixes: #22702
This commit is contained in:
mash-graz 2024-03-11 00:23:06 +01:00 committed by GitHub
parent 16dbbfa64a
commit 80dbcd3ddf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 95 additions and 4 deletions

View file

@ -8,21 +8,25 @@ import {
op_host_recv_ctrl,
op_host_recv_message,
op_host_terminate_worker,
op_message_port_recv_message_sync,
op_require_read_closest_package_json,
} from "ext:core/ops";
import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
import {
deserializeJsMessageData,
MessageChannel,
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
serializeJsMessageData,
} from "ext:deno_web/13_message_port.js";
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { log } from "ext:runtime/06_util.js";
import { notImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter, once } from "node:events";
import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
import { isAbsolute, resolve } from "node:path";
const { ObjectPrototypeIsPrototypeOf } = primordials;
const {
Error,
Symbol,
@ -496,9 +500,24 @@ export function markAsUntransferable() {
export function moveMessagePortToContext() {
notImplemented("moveMessagePortToContext");
}
export function receiveMessageOnPort() {
notImplemented("receiveMessageOnPort");
/**
* @param { MessagePort } port
* @returns {object | undefined}
*/
export function receiveMessageOnPort(port: MessagePort): object | undefined {
if (!(ObjectPrototypeIsPrototypeOf(MessagePortPrototype, port))) {
const err = new TypeError(
'The "port" argument must be a MessagePort instance',
);
err["code"] = "ERR_INVALID_ARG_TYPE";
throw err;
}
const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]);
if (data === null) return undefined;
return { message: deserializeJsMessageData(data)[0] };
}
export {
BroadcastChannel,
MessageChannel,

View file

@ -83,6 +83,7 @@ webidl.configureInterface(MessageChannel);
const MessageChannelPrototype = MessageChannel.prototype;
const _id = Symbol("id");
const MessagePortIdSymbol = _id;
const _enabled = Symbol("enabled");
/**
@ -380,6 +381,7 @@ export {
deserializeJsMessageData,
MessageChannel,
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
serializeJsMessageData,
structuredClone,

View file

@ -110,4 +110,10 @@ declare module "ext:deno_web/13_message_port.js" {
data: Uint8Array;
transferables: Transferable[];
}
const MessageChannel: typeof MessageChannel;
const MessagePort: typeof MessagePort;
const MessagePortIdSymbol: typeof MessagePortIdSymbol;
function deserializeJsMessageData(
messageData: messagePort.MessageData,
): [object, object[]];
}

View file

@ -46,6 +46,7 @@ pub use crate::message_port::create_entangled_message_port;
use crate::message_port::op_message_port_create_entangled;
use crate::message_port::op_message_port_post_message;
use crate::message_port::op_message_port_recv_message;
use crate::message_port::op_message_port_recv_message_sync;
pub use crate::message_port::JsMessageData;
pub use crate::message_port::MessagePort;
@ -78,6 +79,7 @@ deno_core::extension!(deno_web,
op_message_port_create_entangled,
op_message_port_post_message,
op_message_port_recv_message,
op_message_port_recv_message_sync,
compression::op_compression_new,
compression::op_compression_write,
compression::op_compression_finish,

View file

@ -17,6 +17,7 @@ use deno_core::Resource;
use deno_core::ResourceId;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
@ -227,3 +228,22 @@ pub async fn op_message_port_recv_message(
let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
resource.port.recv(state).or_cancel(cancel).await?
}
#[op2]
#[serde]
pub fn op_message_port_recv_message_sync(
state: &mut OpState, // Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<Option<JsMessageData>, AnyError> {
let resource = state.resource_table.get::<MessagePortResource>(rid)?;
let mut rx = resource.port.rx.borrow_mut();
match rx.try_recv() {
Ok((d, t)) => Ok(Some(JsMessageData {
data: d,
transferables: serialize_transferables(state, t),
})),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Ok(None),
}
}

View file

@ -104,6 +104,8 @@
"test-util.js",
"test-webcrypto-sign-verify.js",
"test-whatwg-url-properties.js",
// needs replace ".on" => ".addEventListener" in L29
"test-worker-message-port-receive-message.js",
"test-zlib-convenience-methods.js",
"test-zlib-empty-buffer.js",
"test-zlib-invalid-input.js",
@ -665,6 +667,7 @@
"test-whatwg-url-custom-tostringtag.js",
"test-whatwg-url-override-hostname.js",
"test-whatwg-url-properties.js",
"test-worker-message-port-receive-message.js",
"test-zlib-close-after-error.js",
"test-zlib-close-after-write.js",
"test-zlib-convenience-methods.js",

View file

@ -0,0 +1,40 @@
// deno-fmt-ignore-file
// deno-lint-ignore-file
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// Taken from Node 18.12.1
// This file is automatically generated by `tools/node_compat/setup.ts`. Do not modify this file manually.
'use strict';
const common = require('../common');
const assert = require('assert');
const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
const message1 = { hello: 'world' };
const message2 = { foo: 'bar' };
// Make sure receiveMessageOnPort() works in a FIFO way, the same way it does
// when were using events.
assert.strictEqual(receiveMessageOnPort(port2), undefined);
port1.postMessage(message1);
port1.postMessage(message2);
assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 });
assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message2 });
assert.strictEqual(receiveMessageOnPort(port2), undefined);
assert.strictEqual(receiveMessageOnPort(port2), undefined);
// Make sure message handlers arent called.
port2.addEventListener('message', common.mustNotCall());
port1.postMessage(message1);
assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 });
port1.close();
for (const value of [null, 0, -1, {}, []]) {
assert.throws(() => receiveMessageOnPort(value), {
name: 'TypeError',
code: 'ERR_INVALID_ARG_TYPE',
message: 'The "port" argument must be a MessagePort instance'
});
}

View file

@ -2710,7 +2710,6 @@ Total: 2998
- [parallel/test-worker-message-port-message-port-transferring.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-message-port-transferring.js)
- [parallel/test-worker-message-port-move.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-move.js)
- [parallel/test-worker-message-port-multiple-sharedarraybuffers.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-multiple-sharedarraybuffers.js)
- [parallel/test-worker-message-port-receive-message.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-receive-message.js)
- [parallel/test-worker-message-port-terminate-transfer-list.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-terminate-transfer-list.js)
- [parallel/test-worker-message-port-transfer-closed.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-transfer-closed.js)
- [parallel/test-worker-message-port-transfer-duplicate.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-transfer-duplicate.js)