1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 08:33:43 -05:00

fix(ext/node): support MessagePort in WorkerOptions.workerData (#22950)

This commit fixes passing `MessagePort` instances to
`WorkerOptions.workerData`.

Before they were not serialized and deserialized properly when spawning
a worker thread.

Closes https://github.com/denoland/deno/issues/22935
This commit is contained in:
Bartek Iwańczuk 2024-03-16 00:59:18 +00:00 committed by GitHub
parent ebbc897b69
commit 92576fdcfd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 92 additions and 12 deletions

View file

@ -43,12 +43,15 @@ pub use crate::blob::BlobStore;
pub use crate::blob::InMemoryBlobPart;
pub use crate::message_port::create_entangled_message_port;
pub use crate::message_port::deserialize_js_transferables;
use crate::message_port::op_message_port_create_entangled;
use crate::message_port::op_message_port_post_message;
use crate::message_port::op_message_port_recv_message;
use crate::message_port::op_message_port_recv_message_sync;
pub use crate::message_port::serialize_transferables;
pub use crate::message_port::JsMessageData;
pub use crate::message_port::MessagePort;
pub use crate::message_port::Transferable;
use crate::timers::op_defer;
use crate::timers::op_now;

View file

@ -22,7 +22,7 @@ use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
enum Transferable {
pub enum Transferable {
MessagePort(MessagePort),
ArrayBuffer(u32),
}
@ -140,7 +140,7 @@ pub enum JsTransferable {
ArrayBuffer(u32),
}
fn deserialize_js_transferables(
pub fn deserialize_js_transferables(
state: &mut OpState,
js_transferables: Vec<JsTransferable>,
) -> Result<Vec<Transferable>, AnyError> {
@ -165,7 +165,7 @@ fn deserialize_js_transferables(
Ok(transferables)
}
fn serialize_transferables(
pub fn serialize_transferables(
state: &mut OpState,
transferables: Vec<Transferable>,
) -> Vec<JsTransferable> {
@ -189,8 +189,8 @@ fn serialize_transferables(
#[derive(Deserialize, Serialize)]
pub struct JsMessageData {
data: DetachedBuffer,
transferables: Vec<JsTransferable>,
pub data: DetachedBuffer,
pub transferables: Vec<JsTransferable>,
}
#[op2]
@ -208,7 +208,6 @@ pub fn op_message_port_post_message(
}
let resource = state.resource_table.get::<MessagePortResource>(rid)?;
resource.port.send(state, data)
}

View file

@ -11,6 +11,7 @@ use crate::web_worker::WebWorkerHandle;
use crate::web_worker::WebWorkerType;
use crate::web_worker::WorkerControlEvent;
use crate::web_worker::WorkerId;
use crate::web_worker::WorkerMetadata;
use crate::worker::FormatJsErrorFn;
use deno_core::error::AnyError;
use deno_core::op2;
@ -19,6 +20,7 @@ use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
use deno_web::deserialize_js_transferables;
use deno_web::JsMessageData;
use log::debug;
use std::cell::RefCell;
@ -36,7 +38,7 @@ pub struct CreateWebWorkerArgs {
pub main_module: ModuleSpecifier,
pub worker_type: WebWorkerType,
pub close_on_idle: bool,
pub maybe_worker_metadata: Option<JsMessageData>,
pub maybe_worker_metadata: Option<WorkerMetadata>,
}
pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
@ -175,7 +177,16 @@ fn op_create_worker(
// Setup new thread
let thread_builder = std::thread::Builder::new().name(format!("{worker_id}"));
let maybe_worker_metadata = if let Some(data) = maybe_worker_metadata {
let transferables =
deserialize_js_transferables(state, data.transferables)?;
Some(WorkerMetadata {
buffer: data.data,
transferables,
})
} else {
None
};
// Spawn it
thread_builder.spawn(move || {
// Any error inside this block is terminal:

View file

@ -27,6 +27,7 @@ use deno_core::serde_json::json;
use deno_core::v8;
use deno_core::CancelHandle;
use deno_core::CompiledWasmModuleStore;
use deno_core::DetachedBuffer;
use deno_core::Extension;
use deno_core::FeatureChecker;
use deno_core::GetErrorClassFn;
@ -47,9 +48,11 @@ use deno_kv::dynamic::MultiBackendDbHandler;
use deno_terminal::colors;
use deno_tls::RootCertStoreProvider;
use deno_web::create_entangled_message_port;
use deno_web::serialize_transferables;
use deno_web::BlobStore;
use deno_web::JsMessageData;
use deno_web::MessagePort;
use deno_web::Transferable;
use log::debug;
use std::cell::RefCell;
use std::fmt;
@ -61,6 +64,11 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
pub struct WorkerMetadata {
pub buffer: DetachedBuffer,
pub transferables: Vec<Transferable>,
}
static WORKER_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
@ -343,7 +351,7 @@ pub struct WebWorker {
has_message_event_listener_fn: Option<v8::Global<v8::Value>>,
bootstrap_fn_global: Option<v8::Global<v8::Function>>,
// Consumed when `bootstrap_fn` is called
maybe_worker_metadata: Option<JsMessageData>,
maybe_worker_metadata: Option<WorkerMetadata>,
}
pub struct WebWorkerOptions {
@ -371,7 +379,7 @@ pub struct WebWorkerOptions {
pub feature_checker: Arc<FeatureChecker>,
pub strace_ops: Option<Vec<String>>,
pub close_on_idle: bool,
pub maybe_worker_metadata: Option<JsMessageData>,
pub maybe_worker_metadata: Option<WorkerMetadata>,
}
impl WebWorker {
@ -622,7 +630,8 @@ impl WebWorker {
}
pub fn bootstrap(&mut self, options: &BootstrapOptions) {
self.js_runtime.op_state().borrow_mut().put(options.clone());
let op_state = self.js_runtime.op_state();
op_state.borrow_mut().put(options.clone());
// Instead of using name for log we use `worker-${id}` because
// WebWorkers can have empty string as name.
{
@ -633,7 +642,16 @@ impl WebWorker {
let undefined = v8::undefined(scope);
let mut worker_data: v8::Local<v8::Value> = v8::undefined(scope).into();
if let Some(data) = self.maybe_worker_metadata.take() {
worker_data = deno_core::serde_v8::to_v8(scope, data).unwrap();
let js_transferables = serialize_transferables(
&mut op_state.borrow_mut(),
data.transferables,
);
let js_message_data = JsMessageData {
data: data.buffer,
transferables: js_transferables,
};
worker_data =
deno_core::serde_v8::to_v8(scope, js_message_data).unwrap();
}
let name_str: v8::Local<v8::Value> =
v8::String::new(scope, &self.name).unwrap().into();

View file

@ -238,3 +238,52 @@ Deno.test({
},
sanitizeResources: false,
});
Deno.test({
name: "[worker_threads] Worker workerData with MessagePort",
async fn() {
const { port1: mainPort, port2: workerPort } = new workerThreads
.MessageChannel();
const deferred = Promise.withResolvers<void>();
const worker = new workerThreads.Worker(
`
import {
isMainThread,
MessageChannel,
parentPort,
receiveMessageOnPort,
Worker,
workerData,
} from "node:worker_threads";
parentPort.on("message", (msg) => {
console.log("message from main", msg);
parentPort.postMessage("Hello from worker on parentPort!");
workerData.workerPort.postMessage("Hello from worker on workerPort!");
});
`,
{
eval: true,
workerData: { workerPort },
transferList: [workerPort],
},
);
worker.on("message", (data) => {
assertEquals(data, "Hello from worker on parentPort!");
// TODO(bartlomieju): it would be better to use `mainPort.on("message")`,
// but we currently don't support it.
// https://github.com/denoland/deno/issues/22951
// Wait a bit so the message can arrive.
setTimeout(() => {
const msg = workerThreads.receiveMessageOnPort(mainPort)!.message;
assertEquals(msg, "Hello from worker on workerPort!");
deferred.resolve();
}, 500);
});
worker.postMessage("Hello from parent");
await deferred.promise;
await worker.terminate();
mainPort.close();
},
});