1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-03 12:58:54 -05:00

feat: transfer MessagePort between workers (#11076)

Add support for transferring `MessagePort`s between workers.
This commit is contained in:
Luca Casonato 2021-06-22 16:30:16 +02:00 committed by GitHub
parent 0a2ced5728
commit 6261c89e04
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 382 additions and 355 deletions

View file

@ -386,7 +386,7 @@ declare class Worker extends EventTarget {
specifier: string | URL, specifier: string | URL,
options?: WorkerOptions, options?: WorkerOptions,
); );
postMessage(message: any, transfer: ArrayBuffer[]): void; postMessage(message: any, transfer: Transferable[]): void;
postMessage(message: any, options?: PostMessageOptions): void; postMessage(message: any, options?: PostMessageOptions): void;
addEventListener<K extends keyof WorkerEventMap>( addEventListener<K extends keyof WorkerEventMap>(
type: K, type: K,

View file

@ -68,7 +68,8 @@ declare class DedicatedWorkerGlobalScope extends WorkerGlobalScope {
| ((this: DedicatedWorkerGlobalScope, ev: MessageEvent) => any) | ((this: DedicatedWorkerGlobalScope, ev: MessageEvent) => any)
| null; | null;
close(): void; close(): void;
postMessage(message: any): void; postMessage(message: any, transfer: Transferable[]): void;
postMessage(message: any, options?: PostMessageOptions): void;
addEventListener<K extends keyof DedicatedWorkerGlobalScopeEventMap>( addEventListener<K extends keyof DedicatedWorkerGlobalScopeEventMap>(
type: K, type: K,
listener: ( listener: (
@ -105,7 +106,8 @@ declare var onmessageerror:
| ((this: DedicatedWorkerGlobalScope, ev: MessageEvent) => any) | ((this: DedicatedWorkerGlobalScope, ev: MessageEvent) => any)
| null; | null;
declare function close(): void; declare function close(): void;
declare function postMessage(message: any): void; declare function postMessage(message: any, transfer: Transferable[]): void;
declare function postMessage(message: any, options?: PostMessageOptions): void;
declare var navigator: WorkerNavigator; declare var navigator: WorkerNavigator;
declare var onerror: declare var onerror:
| ((this: DedicatedWorkerGlobalScope, ev: ErrorEvent) => any) | ((this: DedicatedWorkerGlobalScope, ev: ErrorEvent) => any)

View file

@ -125,7 +125,7 @@ fn create_web_worker_callback(
broadcast_channel: program_state.broadcast_channel.clone(), broadcast_channel: program_state.broadcast_channel.clone(),
}; };
let mut worker = WebWorker::from_options( let (mut worker, external_handle) = WebWorker::from_options(
args.name, args.name,
args.permissions, args.permissions,
args.main_module, args.main_module,
@ -151,7 +151,7 @@ fn create_web_worker_callback(
} }
worker.bootstrap(&options); worker.bootstrap(&options);
worker (worker, external_handle)
}) })
} }

View file

@ -0,0 +1,14 @@
const channel = new MessageChannel();
channel.port2.onmessage = (e) => {
channel.port2.postMessage(e.data === "2");
channel.port2.close();
};
self.postMessage("1", [channel.port1]);
self.onmessage = (e) => {
const port1 = e.ports[0];
port1.postMessage(e.data === "3");
port1.close();
};

View file

@ -1,3 +1,3 @@
[WILDCARD]error: Uncaught (in worker "") Cannot resolve module "file:///[WILDCARD]cli/tests/workers/doesnt_exist.js". [WILDCARD]error: Uncaught (in worker "") Cannot resolve module "file:///[WILDCARD]cli/tests/workers/doesnt_exist.js".
error: Uncaught (in promise) Error: Unhandled error event reached main worker. error: Uncaught (in promise) Error: Unhandled error event reached main worker.
at Worker.#poll ([WILDCARD]) at Worker.#pollControl ([WILDCARD])

View file

@ -1,4 +1,4 @@
error: Uncaught (in worker "") Requires read access to "[WILDCARD]local_file.ts", run again with the --allow-read flag error: Uncaught (in worker "") Requires read access to "[WILDCARD]local_file.ts", run again with the --allow-read flag
at blob:null/[WILDCARD]:1:0 at blob:null/[WILDCARD]:1:0
error: Uncaught (in promise) Error: Unhandled error event reached main worker. error: Uncaught (in promise) Error: Unhandled error event reached main worker.
at Worker.#poll (deno:runtime/js/11_workers.js:243:23) at Worker.#pollControl ([WILDCARD])

View file

@ -1,4 +1,4 @@
error: Uncaught (in worker "") Requires net access to "example.com", run again with the --allow-net flag error: Uncaught (in worker "") Requires net access to "example.com", run again with the --allow-net flag
at blob:null/[WILDCARD]:1:0 at blob:null/[WILDCARD]:1:0
error: Uncaught (in promise) Error: Unhandled error event reached main worker. error: Uncaught (in promise) Error: Unhandled error event reached main worker.
at Worker.#poll (deno:runtime/js/11_workers.js:243:23) at Worker.#pollControl ([WILDCARD])

View file

@ -1,4 +1,4 @@
error: Uncaught (in worker "") Requires read access to "[WILDCARD]local_file.ts", run again with the --allow-read flag error: Uncaught (in worker "") Requires read access to "[WILDCARD]local_file.ts", run again with the --allow-read flag
at data:application/javascript;base64,[WILDCARD]:1:0 at data:application/javascript;base64,[WILDCARD]:1:0
error: Uncaught (in promise) Error: Unhandled error event reached main worker. error: Uncaught (in promise) Error: Unhandled error event reached main worker.
at Worker.#poll (deno:runtime/js/11_workers.js:243:23) at Worker.#pollControl ([WILDCARD])

View file

@ -1,4 +1,4 @@
error: Uncaught (in worker "") Requires net access to "example.com", run again with the --allow-net flag error: Uncaught (in worker "") Requires net access to "example.com", run again with the --allow-net flag
at data:application/javascript;base64,aW1wb3J0ICJodHRwczovL2V4YW1wbGUuY29tL3NvbWUvZmlsZS50cyI7:1:0 at data:application/javascript;base64,aW1wb3J0ICJodHRwczovL2V4YW1wbGUuY29tL3NvbWUvZmlsZS50cyI7:1:0
error: Uncaught (in promise) Error: Unhandled error event reached main worker. error: Uncaught (in promise) Error: Unhandled error event reached main worker.
at Worker.#poll (deno:runtime/js/11_workers.js:243:23) at Worker.#pollControl ([WILDCARD])

View file

@ -3,4 +3,4 @@ await import("https://example.com/some/file.ts");
^ ^
at async http://localhost:4545/cli/tests/workers/dynamic_remote.ts:2:1 at async http://localhost:4545/cli/tests/workers/dynamic_remote.ts:2:1
[WILDCARD]error: Uncaught (in promise) Error: Unhandled error event reached main worker. [WILDCARD]error: Uncaught (in promise) Error: Unhandled error event reached main worker.
at Worker.#poll (deno:runtime/js/11_workers.js:243:23) at Worker.#pollControl ([WILDCARD])

View file

@ -1,4 +1,4 @@
error: Uncaught (in worker "") Requires net access to "example.com", run again with the --allow-net flag error: Uncaught (in worker "") Requires net access to "example.com", run again with the --allow-net flag
at http://localhost:4545/cli/tests/workers/static_remote.ts:2:0 at http://localhost:4545/cli/tests/workers/static_remote.ts:2:0
error: Uncaught (in promise) Error: Unhandled error event reached main worker. error: Uncaught (in promise) Error: Unhandled error event reached main worker.
at Worker.#poll (deno:runtime/js/11_workers.js:243:23) at Worker.#pollControl ([WILDCARD])

View file

@ -769,3 +769,56 @@ Deno.test({
worker.terminate(); worker.terminate();
}, },
}); });
Deno.test({
name: "worker with relative specifier",
fn: async function (): Promise<void> {
assertEquals(location.href, "http://127.0.0.1:4545/cli/tests/");
const promise = deferred();
const w = new Worker(
"./workers/test_worker.ts",
{ type: "module", name: "tsWorker" },
);
w.onmessage = (e): void => {
assertEquals(e.data, "Hello, world!");
promise.resolve();
};
w.postMessage("Hello, world!");
await promise;
w.terminate();
},
});
Deno.test({
name: "Send MessagePorts from / to workers",
fn: async function (): Promise<void> {
const result = deferred();
const worker = new Worker(
new URL("message_port.ts", import.meta.url).href,
{ type: "module" },
);
const channel = new MessageChannel();
worker.onmessage = (e) => {
assertEquals(e.data, "1");
assertEquals(e.ports.length, 1);
const port1 = e.ports[0];
port1.onmessage = (e) => {
assertEquals(e.data, true);
port1.close();
worker.postMessage("3", [channel.port1]);
};
port1.postMessage("2");
};
channel.port2.onmessage = (e) => {
assertEquals(e.data, true);
channel.port2.close();
result.resolve();
};
await result;
worker.terminate();
},
});

View file

@ -2,4 +2,4 @@
at foo ([WILDCARD]) at foo ([WILDCARD])
at [WILDCARD] at [WILDCARD]
error: Uncaught (in promise) Error: Unhandled error event reached main worker. error: Uncaught (in promise) Error: Unhandled error event reached main worker.
at Worker.#poll ([WILDCARD]) at Worker.#pollControl ([WILDCARD])

View file

@ -2,4 +2,4 @@
at foo ([WILDCARD]) at foo ([WILDCARD])
at [WILDCARD] at [WILDCARD]
error: Uncaught (in promise) Error: Unhandled error event reached main worker. error: Uncaught (in promise) Error: Unhandled error event reached main worker.
at Worker.#poll ([WILDCARD]) at Worker.#pollControl ([WILDCARD])

View file

@ -2,7 +2,9 @@
mod message_port; mod message_port;
pub use crate::message_port::create_entangled_message_port;
pub use crate::message_port::JsMessageData; pub use crate::message_port::JsMessageData;
pub use crate::message_port::MessagePort;
use deno_core::error::bad_resource_id; use deno_core::error::bad_resource_id;
use deno_core::error::null_opbuf; use deno_core::error::null_opbuf;

View file

@ -23,7 +23,7 @@ type MessagePortMessage = (Vec<u8>, Vec<Transferable>);
pub struct MessagePort { pub struct MessagePort {
rx: RefCell<UnboundedReceiver<MessagePortMessage>>, rx: RefCell<UnboundedReceiver<MessagePortMessage>>,
tx: UnboundedSender<MessagePortMessage>, tx: RefCell<Option<UnboundedSender<MessagePortMessage>>>,
} }
impl MessagePort { impl MessagePort {
@ -37,7 +37,9 @@ impl MessagePort {
// Swallow the failed to send error. It means the channel was disentangled, // Swallow the failed to send error. It means the channel was disentangled,
// but not cleaned up. // but not cleaned up.
self.tx.send((data.data.to_vec(), transferables)).ok(); if let Some(tx) = &*self.tx.borrow() {
tx.send((data.data.to_vec(), transferables)).ok();
}
Ok(()) Ok(())
} }
@ -60,6 +62,13 @@ impl MessagePort {
} }
Ok(None) Ok(None)
} }
/// This forcefully disconnects the message port from its paired port. This
/// will wake up the `.recv` on the paired port, which will return `Ok(None)`.
pub fn disentangle(&self) {
let mut tx = self.tx.borrow_mut();
tx.take();
}
} }
pub fn create_entangled_message_port() -> (MessagePort, MessagePort) { pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
@ -68,12 +77,12 @@ pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
let port1 = MessagePort { let port1 = MessagePort {
rx: RefCell::new(port1_rx), rx: RefCell::new(port1_rx),
tx: port1_tx, tx: RefCell::new(Some(port1_tx)),
}; };
let port2 = MessagePort { let port2 = MessagePort {
rx: RefCell::new(port2_rx), rx: RefCell::new(port2_rx),
tx: port2_tx, tx: RefCell::new(Some(port2_tx)),
}; };
(port1, port2) (port1, port2)
@ -204,5 +213,5 @@ pub async fn op_message_port_recv_message(
} }
}; };
let cancel = RcRef::map(resource.clone(), |r| &r.cancel); let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
resource.port.recv(state.clone()).or_cancel(cancel).await? resource.port.recv(state).or_cancel(cancel).await?
} }

View file

@ -3,10 +3,13 @@
((window) => { ((window) => {
const core = window.Deno.core; const core = window.Deno.core;
const webidl = window.__bootstrap.webidl;
const { Window } = window.__bootstrap.globalInterfaces; const { Window } = window.__bootstrap.globalInterfaces;
const { getLocationHref } = window.__bootstrap.location; const { getLocationHref } = window.__bootstrap.location;
const { log, pathFromURL } = window.__bootstrap.util; const { log, pathFromURL } = window.__bootstrap.util;
const { defineEventHandler } = window.__bootstrap.webUtil; const { defineEventHandler } = window.__bootstrap.webUtil;
const { deserializeJsMessageData, serializeJsMessageData } =
window.__bootstrap.messagePort;
function createWorker( function createWorker(
specifier, specifier,
@ -34,8 +37,12 @@
core.opSync("op_host_post_message", id, data); core.opSync("op_host_post_message", id, data);
} }
function hostGetMessage(id) { function hostRecvCtrl(id) {
return core.opAsync("op_host_get_message", id); return core.opAsync("op_host_recv_ctrl", id);
}
function hostRecvMessage(id) {
return core.opAsync("op_host_recv_message", id);
} }
/** /**
@ -187,18 +194,9 @@
options?.name, options?.name,
); );
this.#id = id; this.#id = id;
this.#poll(); this.#pollControl();
this.#pollMessages();
} }
#handleMessage(data) {
const msgEvent = new MessageEvent("message", {
cancelable: false,
data,
});
this.dispatchEvent(msgEvent);
}
#handleError(e) { #handleError(e) {
const event = new ErrorEvent("error", { const event = new ErrorEvent("error", {
cancelable: true, cancelable: true,
@ -219,9 +217,9 @@
return handled; return handled;
} }
#poll = async () => { #pollControl = async () => {
while (!this.#terminated) { while (!this.#terminated) {
const [type, data] = await hostGetMessage(this.#id); const [type, data] = await hostRecvCtrl(this.#id);
// If terminate was called then we ignore all messages // If terminate was called then we ignore all messages
if (this.#terminated) { if (this.#terminated) {
@ -229,11 +227,6 @@
} }
switch (type) { switch (type) {
case 0: { // Message
const msg = core.deserialize(data);
this.#handleMessage(msg);
break;
}
case 1: { // TerminalError case 1: { // TerminalError
this.#terminated = true; this.#terminated = true;
} /* falls through */ } /* falls through */
@ -262,19 +255,57 @@
} }
}; };
postMessage(message, transferOrOptions) { #pollMessages = async () => {
if (transferOrOptions) { while (!this.terminated) {
throw new Error( const data = await hostRecvMessage(this.#id);
"Not yet implemented: `transfer` and `options` are not supported.", if (data === null) break;
let message, transfer;
try {
const v = deserializeJsMessageData(data);
message = v[0];
transfer = v[1];
} catch (err) {
const event = new MessageEvent("messageerror", {
cancelable: false,
data: err,
});
this.dispatchEvent(event);
return;
}
const event = new MessageEvent("message", {
cancelable: false,
data: message,
ports: transfer,
});
this.dispatchEvent(event);
}
};
postMessage(message, transferOrOptions = {}) {
const prefix = "Failed to execute 'postMessage' on 'MessagePort'";
webidl.requiredArguments(arguments.length, 1, { prefix });
message = webidl.converters.any(message);
let options;
if (
webidl.type(transferOrOptions) === "Object" &&
transferOrOptions !== undefined &&
transferOrOptions[Symbol.iterator] !== undefined
) {
const transfer = webidl.converters["sequence<object>"](
transferOrOptions,
{ prefix, context: "Argument 2" },
); );
options = { transfer };
} else {
options = webidl.converters.PostMessageOptions(transferOrOptions, {
prefix,
context: "Argument 2",
});
} }
const { transfer } = options;
if (this.#terminated) { const data = serializeJsMessageData(message, transfer);
return; if (this.#terminated) return;
} hostPostMessage(this.#id, data);
const bufferMsg = core.serialize(message);
hostPostMessage(this.#id, bufferMsg);
} }
terminate() { terminate() {

View file

@ -42,6 +42,8 @@ delete Object.prototype.__proto__;
const errors = window.__bootstrap.errors.errors; const errors = window.__bootstrap.errors.errors;
const webidl = window.__bootstrap.webidl; const webidl = window.__bootstrap.webidl;
const { defineEventHandler } = window.__bootstrap.webUtil; const { defineEventHandler } = window.__bootstrap.webUtil;
const { deserializeJsMessageData, serializeJsMessageData } =
window.__bootstrap.messagePort;
let windowIsClosing = false; let windowIsClosing = false;
@ -77,9 +79,31 @@ delete Object.prototype.__proto__;
const onmessage = () => {}; const onmessage = () => {};
const onerror = () => {}; const onerror = () => {};
function postMessage(data) { function postMessage(message, transferOrOptions = {}) {
const dataIntArray = core.serialize(data); const prefix =
core.opSync("op_worker_post_message", null, dataIntArray); "Failed to execute 'postMessage' on 'DedicatedWorkerGlobalScope'";
webidl.requiredArguments(arguments.length, 1, { prefix });
message = webidl.converters.any(message);
let options;
if (
webidl.type(transferOrOptions) === "Object" &&
transferOrOptions !== undefined &&
transferOrOptions[Symbol.iterator] !== undefined
) {
const transfer = webidl.converters["sequence<object>"](
transferOrOptions,
{ prefix, context: "Argument 2" },
);
options = { transfer };
} else {
options = webidl.converters.PostMessageOptions(transferOrOptions, {
prefix,
context: "Argument 2",
});
}
const { transfer } = options;
const data = serializeJsMessageData(message, transfer);
core.opSync("op_worker_post_message", data);
} }
let isClosing = false; let isClosing = false;
@ -90,12 +114,16 @@ delete Object.prototype.__proto__;
globalDispatchEvent = globalThis.dispatchEvent.bind(globalThis); globalDispatchEvent = globalThis.dispatchEvent.bind(globalThis);
} }
while (!isClosing) { while (!isClosing) {
const bufferMsg = await core.opAsync("op_worker_get_message"); const data = await core.opAsync("op_worker_recv_message");
const data = core.deserialize(bufferMsg); if (data === null) break;
const v = deserializeJsMessageData(data);
const message = v[0];
const transfer = v[1];
const msgEvent = new MessageEvent("message", { const msgEvent = new MessageEvent("message", {
cancelable: false, cancelable: false,
data, data: message,
ports: transfer,
}); });
try { try {

View file

@ -1,15 +1,15 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use crate::web_worker::WebWorkerInternalHandle; use crate::web_worker::WebWorkerInternalHandle;
use crate::web_worker::WorkerEvent; use crate::web_worker::WorkerControlEvent;
use deno_core::error::generic_error; use deno_core::error::generic_error;
use deno_core::error::null_opbuf;
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::op_async; use deno_core::op_async;
use deno_core::op_sync; use deno_core::op_sync;
use deno_core::CancelFuture;
use deno_core::Extension; use deno_core::Extension;
use deno_core::OpState; use deno_core::OpState;
use deno_core::ZeroCopyBuf; use deno_web::JsMessageData;
use std::cell::RefCell; use std::cell::RefCell;
use std::rc::Rc; use std::rc::Rc;
@ -17,7 +17,7 @@ pub fn init() -> Extension {
Extension::builder() Extension::builder()
.ops(vec![ .ops(vec![
("op_worker_post_message", op_sync(op_worker_post_message)), ("op_worker_post_message", op_sync(op_worker_post_message)),
("op_worker_get_message", op_async(op_worker_get_message)), ("op_worker_recv_message", op_async(op_worker_recv_message)),
// Notify host that guest worker closes. // Notify host that guest worker closes.
("op_worker_close", op_sync(op_worker_close)), ("op_worker_close", op_sync(op_worker_close)),
// Notify host that guest worker has unhandled error. // Notify host that guest worker has unhandled error.
@ -31,30 +31,28 @@ pub fn init() -> Extension {
fn op_worker_post_message( fn op_worker_post_message(
state: &mut OpState, state: &mut OpState,
data: JsMessageData,
_: (), _: (),
buf: Option<ZeroCopyBuf>,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let buf = buf.ok_or_else(null_opbuf)?;
let handle = state.borrow::<WebWorkerInternalHandle>().clone(); let handle = state.borrow::<WebWorkerInternalHandle>().clone();
handle handle.port.send(state, data)?;
.post_event(WorkerEvent::Message(buf))
.expect("Failed to post message to host");
Ok(()) Ok(())
} }
async fn op_worker_get_message( async fn op_worker_recv_message(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
_: (), _: (),
_: (), _: (),
) -> Result<ZeroCopyBuf, AnyError> { ) -> Result<Option<JsMessageData>, AnyError> {
let temp = { let handle = {
let a = state.borrow(); let state = state.borrow();
a.borrow::<WebWorkerInternalHandle>().clone() state.borrow::<WebWorkerInternalHandle>().clone()
}; };
handle
let maybe_data = temp.get_message().await; .port
.recv(state.clone())
Ok(maybe_data.unwrap_or_else(ZeroCopyBuf::empty)) .or_cancel(handle.cancel)
.await?
} }
fn op_worker_close(state: &mut OpState, _: (), _: ()) -> Result<(), AnyError> { fn op_worker_close(state: &mut OpState, _: (), _: ()) -> Result<(), AnyError> {
@ -77,7 +75,7 @@ fn op_worker_unhandled_error(
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let sender = state.borrow::<WebWorkerInternalHandle>().clone(); let sender = state.borrow::<WebWorkerInternalHandle>().clone();
sender sender
.post_event(WorkerEvent::Error(generic_error(message))) .post_event(WorkerControlEvent::Error(generic_error(message)))
.expect("Failed to propagate error event to parent worker"); .expect("Failed to propagate error event to parent worker");
Ok(()) Ok(())
} }

View file

@ -12,25 +12,23 @@ use crate::permissions::UnaryPermission;
use crate::permissions::UnitPermission; use crate::permissions::UnitPermission;
use crate::permissions::WriteDescriptor; use crate::permissions::WriteDescriptor;
use crate::web_worker::run_web_worker; use crate::web_worker::run_web_worker;
use crate::web_worker::SendableWebWorkerHandle;
use crate::web_worker::WebWorker; use crate::web_worker::WebWorker;
use crate::web_worker::WebWorkerHandle; use crate::web_worker::WebWorkerHandle;
use crate::web_worker::WorkerEvent; use crate::web_worker::WorkerControlEvent;
use crate::web_worker::WorkerId; use crate::web_worker::WorkerId;
use deno_core::error::custom_error; use deno_core::error::custom_error;
use deno_core::error::null_opbuf;
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::error::JsError;
use deno_core::op_async; use deno_core::op_async;
use deno_core::op_sync; use deno_core::op_sync;
use deno_core::serde::de; use deno_core::serde::de;
use deno_core::serde::de::SeqAccess; use deno_core::serde::de::SeqAccess;
use deno_core::serde::Deserialize; use deno_core::serde::Deserialize;
use deno_core::serde::Deserializer; use deno_core::serde::Deserializer;
use deno_core::serde_json::json;
use deno_core::Extension; use deno_core::Extension;
use deno_core::ModuleSpecifier; use deno_core::ModuleSpecifier;
use deno_core::OpState; use deno_core::OpState;
use deno_core::ZeroCopyBuf; use deno_web::JsMessageData;
use log::debug; use log::debug;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
@ -51,8 +49,9 @@ pub struct CreateWebWorkerArgs {
pub use_deno_namespace: bool, pub use_deno_namespace: bool,
} }
pub type CreateWebWorkerCb = pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
dyn Fn(CreateWebWorkerArgs) -> WebWorker + Sync + Send; + Sync
+ Send;
/// A holder for callback that is used to create a new /// A holder for callback that is used to create a new
/// WebWorker. It's a struct instead of a type alias /// WebWorker. It's a struct instead of a type alias
@ -87,7 +86,8 @@ pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension {
op_sync(op_host_terminate_worker), op_sync(op_host_terminate_worker),
), ),
("op_host_post_message", op_sync(op_host_post_message)), ("op_host_post_message", op_sync(op_host_post_message)),
("op_host_get_message", op_async(op_host_get_message)), ("op_host_recv_ctrl", op_async(op_host_recv_ctrl)),
("op_host_recv_message", op_async(op_host_recv_message)),
]) ])
.build() .build()
} }
@ -458,8 +458,9 @@ fn op_create_worker(
let module_specifier = deno_core::resolve_url(&specifier)?; let module_specifier = deno_core::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_else(|| "".to_string()); let worker_name = args_name.unwrap_or_else(|| "".to_string());
let (handle_sender, handle_receiver) = let (handle_sender, handle_receiver) = std::sync::mpsc::sync_channel::<
std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, AnyError>>(1); Result<SendableWebWorkerHandle, AnyError>,
>(1);
// Setup new thread // Setup new thread
let thread_builder = let thread_builder =
@ -472,17 +473,18 @@ fn op_create_worker(
// all action done upon it should be noops // all action done upon it should be noops
// - newly spawned thread exits // - newly spawned thread exits
let worker = (create_module_loader.0)(CreateWebWorkerArgs { let (worker, external_handle) =
name: worker_name, (create_module_loader.0)(CreateWebWorkerArgs {
worker_id, name: worker_name,
parent_permissions, worker_id,
permissions: worker_permissions, parent_permissions,
main_module: module_specifier.clone(), permissions: worker_permissions,
use_deno_namespace, main_module: module_specifier.clone(),
}); use_deno_namespace,
});
// Send thread safe handle from newly created worker to host thread // Send thread safe handle from newly created worker to host thread
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); handle_sender.send(Ok(external_handle)).unwrap();
drop(handle_sender); drop(handle_sender);
// At this point the only method of communication with host // At this point the only method of communication with host
@ -497,7 +499,7 @@ fn op_create_worker(
let worker_thread = WorkerThread { let worker_thread = WorkerThread {
join_handle, join_handle,
worker_handle, worker_handle: worker_handle.into(),
}; };
// At this point all interactions with worker happen using thread // At this point all interactions with worker happen using thread
@ -514,7 +516,7 @@ fn op_host_terminate_worker(
id: WorkerId, id: WorkerId,
_: (), _: (),
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let mut worker_thread = state let worker_thread = state
.borrow_mut::<WorkersTable>() .borrow_mut::<WorkersTable>()
.remove(&id) .remove(&id)
.expect("No worker handle found"); .expect("No worker handle found");
@ -527,52 +529,13 @@ fn op_host_terminate_worker(
Ok(()) Ok(())
} }
use deno_core::serde::Serialize;
use deno_core::serde::Serializer;
impl Serialize for WorkerEvent {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let type_id = match &self {
WorkerEvent::Message(_) => 0_i32,
WorkerEvent::TerminalError(_) => 1_i32,
WorkerEvent::Error(_) => 2_i32,
WorkerEvent::Close => 3_i32,
};
match self {
WorkerEvent::Message(buf) => {
Serialize::serialize(&(type_id, buf), serializer)
}
WorkerEvent::TerminalError(error) | WorkerEvent::Error(error) => {
let value = match error.downcast_ref::<JsError>() {
Some(js_error) => json!({
"message": js_error.message,
"fileName": js_error.script_resource_name,
"lineNumber": js_error.line_number,
"columnNumber": js_error.start_column,
}),
None => json!({
"message": error.to_string(),
}),
};
Serialize::serialize(&(type_id, value), serializer)
}
_ => Serialize::serialize(&(type_id, ()), serializer),
}
}
}
/// Try to remove worker from workers table - NOTE: `Worker.terminate()` /// Try to remove worker from workers table - NOTE: `Worker.terminate()`
/// might have been called already meaning that we won't find worker in /// might have been called already meaning that we won't find worker in
/// table - in that case ignore. /// table - in that case ignore.
fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) { fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) {
let mut s = state.borrow_mut(); let mut s = state.borrow_mut();
let workers = s.borrow_mut::<WorkersTable>(); let workers = s.borrow_mut::<WorkersTable>();
if let Some(mut worker_thread) = workers.remove(&id) { if let Some(worker_thread) = workers.remove(&id) {
worker_thread.worker_handle.terminate(); worker_thread.worker_handle.terminate();
worker_thread worker_thread
.join_handle .join_handle
@ -582,12 +545,43 @@ fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) {
} }
} }
/// Get message from guest worker as host /// Get control event from guest worker as host
async fn op_host_get_message( async fn op_host_recv_ctrl(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
id: WorkerId, id: WorkerId,
_: (), _: (),
) -> Result<WorkerEvent, AnyError> { ) -> Result<WorkerControlEvent, AnyError> {
let worker_handle = {
let state = state.borrow();
let workers_table = state.borrow::<WorkersTable>();
let maybe_handle = workers_table.get(&id);
if let Some(handle) = maybe_handle {
handle.worker_handle.clone()
} else {
// If handle was not found it means worker has already shutdown
return Ok(WorkerControlEvent::Close);
}
};
let maybe_event = worker_handle.get_control_event().await?;
if let Some(event) = maybe_event {
// Terminal error means that worker should be removed from worker table.
if let WorkerControlEvent::TerminalError(_) = &event {
try_remove_and_close(state, id);
}
return Ok(event);
}
// If there was no event from worker it means it has already been closed.
try_remove_and_close(state, id);
Ok(WorkerControlEvent::Close)
}
async fn op_host_recv_message(
state: Rc<RefCell<OpState>>,
id: WorkerId,
_: (),
) -> Result<Option<JsMessageData>, AnyError> {
let worker_handle = { let worker_handle = {
let s = state.borrow(); let s = state.borrow();
let workers_table = s.borrow::<WorkersTable>(); let workers_table = s.borrow::<WorkersTable>();
@ -596,37 +590,26 @@ async fn op_host_get_message(
handle.worker_handle.clone() handle.worker_handle.clone()
} else { } else {
// If handle was not found it means worker has already shutdown // If handle was not found it means worker has already shutdown
return Ok(WorkerEvent::Close); return Ok(None);
} }
}; };
worker_handle.port.recv(state).await
let maybe_event = worker_handle.get_event().await?;
if let Some(event) = maybe_event {
// Terminal error means that worker should be removed from worker table.
if let WorkerEvent::TerminalError(_) = &event {
try_remove_and_close(state, id);
}
return Ok(event);
}
// If there was no event from worker it means it has already been closed.
try_remove_and_close(state, id);
Ok(WorkerEvent::Close)
} }
/// Post message to guest worker as host /// Post message to guest worker as host
fn op_host_post_message( fn op_host_post_message(
state: &mut OpState, state: &mut OpState,
id: WorkerId, id: WorkerId,
data: Option<ZeroCopyBuf>, data: JsMessageData,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let msg = data.ok_or_else(null_opbuf)?;
debug!("post message to worker {}", id); debug!("post message to worker {}", id);
let worker_thread = state let worker_handle = {
.borrow::<WorkersTable>() let worker_thread = state
.get(&id) .borrow::<WorkersTable>()
.expect("No worker handle found"); .get(&id)
worker_thread.worker_handle.post_message(msg)?; .expect("No worker handle found");
worker_thread.worker_handle.clone()
};
worker_handle.port.send(state, data)?;
Ok(()) Ok(())
} }

View file

@ -8,6 +8,7 @@ use crate::permissions::Permissions;
use crate::tokio_util::create_basic_runtime; use crate::tokio_util::create_basic_runtime;
use deno_broadcast_channel::InMemoryBroadcastChannel; use deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::error::JsError;
use deno_core::futures::channel::mpsc; use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn; use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt; use deno_core::futures::future::FutureExt;
@ -18,6 +19,7 @@ use deno_core::serde::Serialize;
use deno_core::serde_json; use deno_core::serde_json;
use deno_core::serde_json::json; use deno_core::serde_json::json;
use deno_core::v8; use deno_core::v8;
use deno_core::CancelHandle;
use deno_core::Extension; use deno_core::Extension;
use deno_core::GetErrorClassFn; use deno_core::GetErrorClassFn;
use deno_core::JsErrorCreateFn; use deno_core::JsErrorCreateFn;
@ -26,8 +28,9 @@ use deno_core::ModuleId;
use deno_core::ModuleLoader; use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier; use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions; use deno_core::RuntimeOptions;
use deno_core::ZeroCopyBuf; use deno_web::create_entangled_message_port;
use deno_web::BlobUrlStore; use deno_web::BlobUrlStore;
use deno_web::MessagePort;
use log::debug; use log::debug;
use std::cell::RefCell; use std::cell::RefCell;
use std::env; use std::env;
@ -38,7 +41,6 @@ use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
use tokio::sync::Mutex as AsyncMutex;
#[derive( #[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize,
@ -55,29 +57,62 @@ impl WorkerId {
} }
} }
type WorkerMessage = ZeroCopyBuf;
/// Events that are sent to host from child /// Events that are sent to host from child
/// worker. /// worker.
pub enum WorkerEvent { pub enum WorkerControlEvent {
Message(WorkerMessage),
Error(AnyError), Error(AnyError),
TerminalError(AnyError), TerminalError(AnyError),
Close, Close,
} }
use deno_core::serde::Serializer;
impl Serialize for WorkerControlEvent {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let type_id = match &self {
WorkerControlEvent::TerminalError(_) => 1_i32,
WorkerControlEvent::Error(_) => 2_i32,
WorkerControlEvent::Close => 3_i32,
};
match self {
WorkerControlEvent::TerminalError(error)
| WorkerControlEvent::Error(error) => {
let value = match error.downcast_ref::<JsError>() {
Some(js_error) => json!({
"message": js_error.message,
"fileName": js_error.script_resource_name,
"lineNumber": js_error.line_number,
"columnNumber": js_error.start_column,
}),
None => json!({
"message": error.to_string(),
}),
};
Serialize::serialize(&(type_id, value), serializer)
}
_ => Serialize::serialize(&(type_id, ()), serializer),
}
}
}
// Channels used for communication with worker's parent // Channels used for communication with worker's parent
#[derive(Clone)] #[derive(Clone)]
pub struct WebWorkerInternalHandle { pub struct WebWorkerInternalHandle {
sender: mpsc::Sender<WorkerEvent>, sender: mpsc::Sender<WorkerControlEvent>,
receiver: Rc<RefCell<mpsc::Receiver<WorkerMessage>>>, pub port: Rc<MessagePort>,
pub cancel: Rc<CancelHandle>,
terminated: Arc<AtomicBool>, terminated: Arc<AtomicBool>,
isolate_handle: v8::IsolateHandle, isolate_handle: v8::IsolateHandle,
} }
impl WebWorkerInternalHandle { impl WebWorkerInternalHandle {
/// Post WorkerEvent to parent as a worker /// Post WorkerEvent to parent as a worker
pub fn post_event(&self, event: WorkerEvent) -> Result<(), AnyError> { pub fn post_event(&self, event: WorkerControlEvent) -> Result<(), AnyError> {
let mut sender = self.sender.clone(); let mut sender = self.sender.clone();
// If the channel is closed, // If the channel is closed,
// the worker must have terminated but the termination message has not yet been received. // the worker must have terminated but the termination message has not yet been received.
@ -91,13 +126,6 @@ impl WebWorkerInternalHandle {
Ok(()) Ok(())
} }
/// Get the WorkerEvent with lock
/// Panic if more than one listener tries to get event
pub async fn get_message(&self) -> Option<WorkerMessage> {
let mut receiver = self.receiver.borrow_mut();
receiver.next().await
}
/// Check if this worker is terminated or being terminated /// Check if this worker is terminated or being terminated
pub fn is_terminated(&self) -> bool { pub fn is_terminated(&self) -> bool {
self.terminated.load(Ordering::SeqCst) self.terminated.load(Ordering::SeqCst)
@ -106,6 +134,8 @@ impl WebWorkerInternalHandle {
/// Terminate the worker /// Terminate the worker
/// This function will set terminated to true, terminate the isolate and close the message channel /// This function will set terminated to true, terminate the isolate and close the message channel
pub fn terminate(&mut self) { pub fn terminate(&mut self) {
self.cancel.cancel();
// This function can be called multiple times by whomever holds // This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so // the handle. However only a single "termination" should occur so
// we need a guard here. // we need a guard here.
@ -121,40 +151,52 @@ impl WebWorkerInternalHandle {
} }
} }
pub struct SendableWebWorkerHandle {
port: MessagePort,
receiver: mpsc::Receiver<WorkerControlEvent>,
terminated: Arc<AtomicBool>,
isolate_handle: v8::IsolateHandle,
}
impl From<SendableWebWorkerHandle> for WebWorkerHandle {
fn from(handle: SendableWebWorkerHandle) -> Self {
WebWorkerHandle {
receiver: Rc::new(RefCell::new(handle.receiver)),
port: Rc::new(handle.port),
terminated: handle.terminated,
isolate_handle: handle.isolate_handle,
}
}
}
/// This is the handle to the web worker that the parent thread uses to
/// communicate with the worker. It is created from a `SendableWebWorkerHandle`
/// which is sent to the parent thread from the worker thread where it is
/// created. The reason for this seperation is that the handle first needs to be
/// `Send` when transferring between threads, and then must be `Clone` when it
/// has arrived on the parent thread. It can not be both at once without large
/// amounts of Arc<Mutex> and other fun stuff.
#[derive(Clone)] #[derive(Clone)]
pub struct WebWorkerHandle { pub struct WebWorkerHandle {
sender: mpsc::Sender<WorkerMessage>, pub port: Rc<MessagePort>,
receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>, receiver: Rc<RefCell<mpsc::Receiver<WorkerControlEvent>>>,
terminated: Arc<AtomicBool>, terminated: Arc<AtomicBool>,
isolate_handle: v8::IsolateHandle, isolate_handle: v8::IsolateHandle,
} }
impl WebWorkerHandle { impl WebWorkerHandle {
/// Post WorkerMessage to worker as a host
pub fn post_message(&self, buf: WorkerMessage) -> Result<(), AnyError> {
let mut sender = self.sender.clone();
// If the channel is closed,
// the worker must have terminated but the termination message has not yet been recieved.
//
// Therefore just treat it as if the worker has terminated and return.
if sender.is_closed() {
self.terminated.store(true, Ordering::SeqCst);
return Ok(());
}
sender.try_send(buf)?;
Ok(())
}
/// Get the WorkerEvent with lock /// Get the WorkerEvent with lock
/// Return error if more than one listener tries to get event /// Return error if more than one listener tries to get event
pub async fn get_event(&self) -> Result<Option<WorkerEvent>, AnyError> { pub async fn get_control_event(
let mut receiver = self.receiver.try_lock()?; &self,
) -> Result<Option<WorkerControlEvent>, AnyError> {
let mut receiver = self.receiver.borrow_mut();
Ok(receiver.next().await) Ok(receiver.next().await)
} }
/// Terminate the worker /// Terminate the worker
/// This function will set terminated to true, terminate the isolate and close the message channel /// This function will set terminated to true, terminate the isolate and close the message channel
pub fn terminate(&mut self) { pub fn terminate(self) {
// This function can be called multiple times by whomever holds // This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so // the handle. However only a single "termination" should occur so
// we need a guard here. // we need a guard here.
@ -165,26 +207,26 @@ impl WebWorkerHandle {
self.isolate_handle.terminate_execution(); self.isolate_handle.terminate_execution();
} }
// Wake web worker by closing the channel self.port.disentangle();
self.sender.close_channel();
} }
} }
fn create_handles( fn create_handles(
isolate_handle: v8::IsolateHandle, isolate_handle: v8::IsolateHandle,
) -> (WebWorkerInternalHandle, WebWorkerHandle) { ) -> (WebWorkerInternalHandle, SendableWebWorkerHandle) {
let (in_tx, in_rx) = mpsc::channel::<WorkerMessage>(1); let (parent_port, worker_port) = create_entangled_message_port();
let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1); let (ctrl_tx, ctrl_rx) = mpsc::channel::<WorkerControlEvent>(1);
let terminated = Arc::new(AtomicBool::new(false)); let terminated = Arc::new(AtomicBool::new(false));
let internal_handle = WebWorkerInternalHandle { let internal_handle = WebWorkerInternalHandle {
sender: out_tx, sender: ctrl_tx,
receiver: Rc::new(RefCell::new(in_rx)), port: Rc::new(parent_port),
terminated: terminated.clone(), terminated: terminated.clone(),
isolate_handle: isolate_handle.clone(), isolate_handle: isolate_handle.clone(),
cancel: CancelHandle::new_rc(),
}; };
let external_handle = WebWorkerHandle { let external_handle = SendableWebWorkerHandle {
sender: in_tx, receiver: ctrl_rx,
receiver: Arc::new(AsyncMutex::new(out_rx)), port: worker_port,
terminated, terminated,
isolate_handle, isolate_handle,
}; };
@ -200,7 +242,6 @@ pub struct WebWorker {
pub js_runtime: JsRuntime, pub js_runtime: JsRuntime,
pub name: String, pub name: String,
internal_handle: WebWorkerInternalHandle, internal_handle: WebWorkerInternalHandle,
external_handle: WebWorkerHandle,
pub use_deno_namespace: bool, pub use_deno_namespace: bool,
pub main_module: ModuleSpecifier, pub main_module: ModuleSpecifier,
} }
@ -237,7 +278,7 @@ impl WebWorker {
main_module: ModuleSpecifier, main_module: ModuleSpecifier,
worker_id: WorkerId, worker_id: WorkerId,
options: &WebWorkerOptions, options: &WebWorkerOptions,
) -> Self { ) -> (Self, SendableWebWorkerHandle) {
// Permissions: many ops depend on this // Permissions: many ops depend on this
let unstable = options.unstable; let unstable = options.unstable;
let perm_ext = Extension::builder() let perm_ext = Extension::builder()
@ -333,15 +374,17 @@ impl WebWorker {
(internal_handle, external_handle) (internal_handle, external_handle)
}; };
Self { (
id: worker_id, Self {
js_runtime, id: worker_id,
name, js_runtime,
internal_handle, name,
internal_handle,
use_deno_namespace: options.use_deno_namespace,
main_module,
},
external_handle, external_handle,
use_deno_namespace: options.use_deno_namespace, )
main_module,
}
} }
pub fn bootstrap(&mut self, options: &WebWorkerOptions) { pub fn bootstrap(&mut self, options: &WebWorkerOptions) {
@ -419,11 +462,6 @@ impl WebWorker {
} }
} }
/// Returns a way to communicate with the Worker from other threads.
pub fn thread_safe_handle(&self) -> WebWorkerHandle {
self.external_handle.clone()
}
pub fn poll_event_loop( pub fn poll_event_loop(
&mut self, &mut self,
cx: &mut Context, cx: &mut Context,
@ -446,7 +484,7 @@ impl WebWorker {
print_worker_error(e.to_string(), &self.name); print_worker_error(e.to_string(), &self.name);
let handle = self.internal_handle.clone(); let handle = self.internal_handle.clone();
handle handle
.post_event(WorkerEvent::Error(e)) .post_event(WorkerControlEvent::Error(e))
.expect("Failed to post message to host"); .expect("Failed to post message to host");
return Poll::Pending; return Poll::Pending;
@ -513,7 +551,7 @@ pub fn run_web_worker(
if let Err(e) = result { if let Err(e) = result {
print_worker_error(e.to_string(), &name); print_worker_error(e.to_string(), &name);
internal_handle internal_handle
.post_event(WorkerEvent::TerminalError(e)) .post_event(WorkerControlEvent::TerminalError(e))
.expect("Failed to post message to host"); .expect("Failed to post message to host");
// Failure to execute script is a terminal error, bye, bye. // Failure to execute script is a terminal error, bye, bye.
@ -524,134 +562,3 @@ pub fn run_web_worker(
debug!("Worker thread shuts down {}", &name); debug!("Worker thread shuts down {}", &name);
result result
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::tokio_util;
fn create_test_web_worker() -> WebWorker {
let main_module = deno_core::resolve_url_or_path("./hello.js").unwrap();
let module_loader = Rc::new(deno_core::NoopModuleLoader);
let create_web_worker_cb = Arc::new(|_| unreachable!());
let options = WebWorkerOptions {
args: vec![],
apply_source_maps: false,
debug_flag: false,
unstable: false,
ca_data: None,
user_agent: "x".to_string(),
seed: None,
module_loader,
create_web_worker_cb,
js_error_create_fn: None,
use_deno_namespace: false,
maybe_inspector_server: None,
runtime_version: "x".to_string(),
ts_version: "x".to_string(),
no_color: true,
get_error_class_fn: None,
blob_url_store: BlobUrlStore::default(),
broadcast_channel: InMemoryBroadcastChannel::default(),
};
let mut worker = WebWorker::from_options(
"TEST".to_string(),
Permissions::allow_all(),
main_module,
WorkerId(1),
&options,
);
worker.bootstrap(&options);
worker
}
#[tokio::test]
async fn test_worker_messages() {
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<WebWorkerHandle>(1);
let join_handle = std::thread::spawn(move || {
let mut worker = create_test_web_worker();
let source = r#"
onmessage = function(e) {
console.log("msg from main script", e.data);
if (e.data == "exit") {
return close();
} else {
console.assert(e.data === "hi");
}
postMessage([1, 2, 3]);
console.log("after postMessage");
}
"#;
worker.execute_script("a", source).unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
let r = tokio_util::run_basic(worker.run_event_loop(false));
assert!(r.is_ok())
});
let mut handle = handle_receiver.recv().unwrap();
// TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded
let r = handle.post_message(msg.clone().into());
assert!(r.is_ok());
let maybe_msg = handle.get_event().await.unwrap();
assert!(maybe_msg.is_some());
let r = handle.post_message(msg.clone().into());
assert!(r.is_ok());
let maybe_msg = handle.get_event().await.unwrap();
assert!(maybe_msg.is_some());
match maybe_msg {
Some(WorkerEvent::Message(buf)) => {
// TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
assert_eq!(*buf, [65, 3, 73, 2, 73, 4, 73, 6, 36, 0, 3]);
}
_ => unreachable!(),
}
// TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
let msg = vec![34, 4, 101, 120, 105, 116].into_boxed_slice(); // "exit" encoded
let r = handle.post_message(msg.into());
assert!(r.is_ok());
let event = handle.get_event().await.unwrap();
assert!(event.is_none());
handle.sender.close_channel();
join_handle.join().expect("Failed to join worker thread");
}
#[tokio::test]
async fn removed_from_resource_table_on_close() {
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<WebWorkerHandle>(1);
let join_handle = std::thread::spawn(move || {
let mut worker = create_test_web_worker();
worker
.execute_script("a", "onmessage = () => { close(); }")
.unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
let r = tokio_util::run_basic(worker.run_event_loop(false));
assert!(r.is_ok())
});
let mut handle = handle_receiver.recv().unwrap();
// TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded
let r = handle.post_message(msg.clone().into());
assert!(r.is_ok());
let event = handle.get_event().await.unwrap();
assert!(event.is_none());
handle.sender.close_channel();
join_handle.join().expect("Failed to join worker thread");
}
}