diff --git a/cli/bench/main.rs b/cli/bench/main.rs index b98a9d141e..e15f76277e 100644 --- a/cli/bench/main.rs +++ b/cli/bench/main.rs @@ -75,7 +75,7 @@ const EXEC_TIME_BENCHMARKS: &[(&str, &[&str], Option)] = &[ &[ "run", "--allow-read", - "cli/tests/workers_large_message_bench.ts", + "cli/tests/workers/bench_large_message.ts", ], None, ), diff --git a/cli/tests/workers_large_message_bench.ts b/cli/tests/workers/bench_large_message.ts similarity index 66% rename from cli/tests/workers_large_message_bench.ts rename to cli/tests/workers/bench_large_message.ts index 9cda5a40d6..53076e7113 100644 --- a/cli/tests/workers_large_message_bench.ts +++ b/cli/tests/workers/bench_large_message.ts @@ -1,14 +1,10 @@ // Copyright 2020 the Deno authors. All rights reserved. MIT license. -// deno-lint-ignore-file - -import { deferred } from "../../test_util/std/async/deferred.ts"; - -function oneWorker(i: any): Promise { +function oneWorker(i: number) { return new Promise((resolve) => { let countDown = 10; const worker = new Worker( - new URL("workers/large_message_worker.js", import.meta.url).href, + new URL("worker_large_message.js", import.meta.url).href, { type: "module" }, ); worker.onmessage = (e): void => { @@ -23,8 +19,8 @@ function oneWorker(i: any): Promise { }); } -function bench(): Promise { - let promises = []; +function bench() { + const promises = []; for (let i = 0; i < 50; i++) { promises.push(oneWorker(i)); } diff --git a/cli/tests/workers/racy_worker.js b/cli/tests/workers/racy_worker.js index 83756b791d..0f66c62785 100644 --- a/cli/tests/workers/racy_worker.js +++ b/cli/tests/workers/racy_worker.js @@ -1,21 +1,25 @@ // See issue for details // https://github.com/denoland/deno/issues/4080 // -// After first call to `postMessage() this worker schedules -// [close(), postMessage()] ops on the same turn of microtask queue -// (because message is rather big). -// Only single `postMessage()` call should make it -// to host, ie. after calling `close()` no more code should be run. +// After first received message, this worker schedules +// [assert(), close(), assert()] ops on the same turn of microtask queue +// All tasks after close should not make it -setTimeout(() => { - close(); -}, 50); - -while (true) { - await new Promise((done) => { +onmessage = async function () { + let stage = 0; + await new Promise((_) => { setTimeout(() => { - postMessage({ buf: new Array(999999) }); - done(); - }); + if (stage !== 0) throw "Unexpected stage"; + stage = 1; + }, 50); + setTimeout(() => { + if (stage !== 1) throw "Unexpected stage"; + stage = 2; + postMessage("DONE"); + close(); + }, 50); + setTimeout(() => { + throw "This should not be run"; + }, 50); }); -} +}; diff --git a/cli/tests/workers/test.ts b/cli/tests/workers/test.ts index c3ccebfbb9..41988d204a 100644 --- a/cli/tests/workers/test.ts +++ b/cli/tests/workers/test.ts @@ -198,15 +198,12 @@ Deno.test({ ); racyWorker.onmessage = (e): void => { - assertEquals(e.data.buf.length, 999999); - racyWorker.onmessage = (_e): void => { - throw new Error("unreachable"); - }; setTimeout(() => { promise.resolve(); }, 100); }; + racyWorker.postMessage("START"); await promise; }, }); @@ -726,3 +723,38 @@ Deno.test({ worker.terminate(); }, }); + +Deno.test({ + name: "structured cloning postMessage", + fn: async function (): Promise { + const result = deferred(); + const worker = new Worker( + new URL("worker_structured_cloning.ts", import.meta.url).href, + { type: "module" }, + ); + + worker.onmessage = (e): void => { + // self field should reference itself (circular ref) + const value = e.data.self.self.self; + + // fields a and b refer to the same array + assertEquals(value.a, ["a", true, 432]); + assertEquals(value.a, ["a", true, 432]); + value.b[0] = "b"; + value.a[2] += 5; + assertEquals(value.a, ["b", true, 437]); + assertEquals(value.b, ["b", true, 437]); + + const len = value.c.size; + value.c.add(1); // This value is already in the set. + value.c.add(2); + assertEquals(len + 1, value.c.size); + + result.resolve(); + }; + + worker.postMessage("START"); + await result; + worker.terminate(); + }, +}); diff --git a/cli/tests/workers/large_message_worker.js b/cli/tests/workers/worker_large_message.js similarity index 100% rename from cli/tests/workers/large_message_worker.js rename to cli/tests/workers/worker_large_message.js diff --git a/cli/tests/workers/worker_structured_cloning.ts b/cli/tests/workers/worker_structured_cloning.ts new file mode 100644 index 0000000000..eb1719a9a8 --- /dev/null +++ b/cli/tests/workers/worker_structured_cloning.ts @@ -0,0 +1,15 @@ +// More info on structured cloning can be found here: +// https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm + +self.onmessage = () => { + const arr = ["a", true, 432]; + const set = new Set([1, 3, 5, 7, 9]); + const selfReference = { + a: arr, + b: arr, + c: set, + }; + // deno-lint-ignore no-explicit-any + (selfReference as any).self = selfReference; + self.postMessage(selfReference); +}; diff --git a/core/bindings.rs b/core/bindings.rs index edf115d275..f6c94b335c 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -553,7 +553,7 @@ fn deserialize( match value { Some(deserialized) => rv.set(deserialized), None => { - let msg = v8::String::new(scope, "string too long").unwrap(); + let msg = v8::String::new(scope, "could not deserialize value").unwrap(); let exception = v8::Exception::range_error(scope, msg); scope.throw_exception(exception); } diff --git a/runtime/js/11_workers.js b/runtime/js/11_workers.js index 508dd46d48..dca83c8181 100644 --- a/runtime/js/11_workers.js +++ b/runtime/js/11_workers.js @@ -39,26 +39,8 @@ return core.opAsync("op_host_get_message", id); } - const encoder = new TextEncoder(); const decoder = new TextDecoder(); - function encodeMessage(data) { - const dataJson = JSON.stringify(data); - return encoder.encode(dataJson); - } - - function decodeMessage(dataIntArray) { - // Temporary solution until structured clone arrives in v8. - // Current clone is made by parsing json to byte array and from byte array back to json. - // In that case "undefined" transforms to empty byte array, but empty byte array does not transform back to undefined. - // Thats why this special is statement is needed. - if (dataIntArray.length == 0) { - return undefined; - } - const dataJson = decoder.decode(dataIntArray); - return JSON.parse(dataJson); - } - /** * @param {string} permission * @return {boolean} @@ -211,18 +193,7 @@ this.#poll(); } - #handleMessage = (msgData) => { - let data; - try { - data = decodeMessage(new Uint8Array(msgData)); - } catch (e) { - const msgErrorEvent = new MessageEvent("messageerror", { - cancelable: false, - data, - }); - return; - } - + #handleMessage = (data) => { const msgEvent = new MessageEvent("message", { cancelable: false, data, @@ -253,56 +224,44 @@ #poll = async () => { while (!this.#terminated) { - const event = await hostGetMessage(this.#id); + const [type, data] = await hostGetMessage(this.#id); // If terminate was called then we ignore all messages if (this.#terminated) { return; } - const type = event.type; - - if (type === "terminalError") { - this.#terminated = true; - if (!this.#handleError(event.error)) { - if (globalThis instanceof Window) { - throw new Error("Unhandled error event reached main worker."); - } else { - core.opSync( - "op_host_unhandled_error", - event.error.message, - ); - } + switch (type) { + case 0: { // Message + const msg = core.deserialize(data); + this.#handleMessage(msg); + break; } - continue; - } - - if (type === "msg") { - this.#handleMessage(event.data); - continue; - } - - if (type === "error") { - if (!this.#handleError(event.error)) { - if (globalThis instanceof Window) { - throw new Error("Unhandled error event reached main worker."); - } else { - core.opSync( - "op_host_unhandled_error", - event.error.message, - ); + case 1: { // TerminalError + this.#terminated = true; + } /* falls through */ + case 2: { // Error + if (!this.#handleError(data)) { + if (globalThis instanceof Window) { + throw new Error("Unhandled error event reached main worker."); + } else { + core.opSync( + "op_worker_unhandled_error", + data.message, + ); + } } + break; + } + case 3: { // Close + log(`Host got "close" message from worker: ${this.#name}`); + this.#terminated = true; + return; + } + default: { + throw new Error(`Unknown worker event: "${type}"`); } - continue; } - - if (type === "close") { - log(`Host got "close" message from worker: ${this.#name}`); - this.#terminated = true; - return; - } - - throw new Error(`Unknown worker event: "${type}"`); } }; @@ -317,7 +276,8 @@ return; } - hostPostMessage(this.#id, encodeMessage(message)); + const bufferMsg = core.serialize(message); + hostPostMessage(this.#id, bufferMsg); } terminate() { diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index d2926bb1f8..082c83593e 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -67,7 +67,7 @@ delete Object.prototype.__proto__; } isClosing = true; - opCloseWorker(); + core.opSync("op_worker_close"); } // TODO(bartlomieju): remove these functions @@ -76,68 +76,64 @@ delete Object.prototype.__proto__; const onerror = () => {}; function postMessage(data) { - const dataJson = JSON.stringify(data); - const dataIntArray = encoder.encode(dataJson); - opPostMessage(dataIntArray); + const dataIntArray = core.serialize(data); + core.opSync("op_worker_post_message", null, dataIntArray); } let isClosing = false; - async function workerMessageRecvCallback(data) { - const msgEvent = new MessageEvent("message", { - cancelable: false, - data, - }); + async function pollForMessages() { + while (!isClosing) { + const bufferMsg = await core.opAsync("op_worker_get_message"); + const data = core.deserialize(bufferMsg); - try { - if (globalThis["onmessage"]) { - const result = globalThis.onmessage(msgEvent); - if (result && "then" in result) { - await result; - } - } - globalThis.dispatchEvent(msgEvent); - } catch (e) { - let handled = false; - - const errorEvent = new ErrorEvent("error", { - cancelable: true, - message: e.message, - lineno: e.lineNumber ? e.lineNumber + 1 : undefined, - colno: e.columnNumber ? e.columnNumber + 1 : undefined, - filename: e.fileName, - error: null, + const msgEvent = new MessageEvent("message", { + cancelable: false, + data, }); - if (globalThis["onerror"]) { - const ret = globalThis.onerror( - e.message, - e.fileName, - e.lineNumber, - e.columnNumber, - e, - ); - handled = ret === true; - } + try { + if (globalThis.onmessage) { + await globalThis.onmessage(msgEvent); + } + globalThis.dispatchEvent(msgEvent); + } catch (e) { + let handled = false; - globalThis.dispatchEvent(errorEvent); - if (errorEvent.defaultPrevented) { - handled = true; - } + const errorEvent = new ErrorEvent("error", { + cancelable: true, + message: e.message, + lineno: e.lineNumber ? e.lineNumber + 1 : undefined, + colno: e.columnNumber ? e.columnNumber + 1 : undefined, + filename: e.fileName, + error: null, + }); - if (!handled) { - throw e; + if (globalThis["onerror"]) { + const ret = globalThis.onerror( + e.message, + e.fileName, + e.lineNumber, + e.columnNumber, + e, + ); + handled = ret === true; + } + + globalThis.dispatchEvent(errorEvent); + if (errorEvent.defaultPrevented) { + handled = true; + } + + if (!handled) { + core.opSync( + "op_worker_unhandled_error", + e.message, + ); + } } } } - function opPostMessage(data) { - core.opSync("op_worker_post_message", null, data); - } - - function opCloseWorker() { - core.opSync("op_worker_close"); - } - function opMainModule() { return core.opSync("op_main_module"); } @@ -395,7 +391,6 @@ delete Object.prototype.__proto__; // TODO(bartlomieju): should be readonly? close: util.nonEnumerable(workerClose), postMessage: util.writable(postMessage), - workerMessageRecvCallback: util.nonEnumerable(workerMessageRecvCallback), }; let hasBootstrapped = false; @@ -506,6 +501,8 @@ delete Object.prototype.__proto__; location.setLocationHref(locationHref); registerErrors(); + pollForMessages(); + const internalSymbol = Symbol("Deno.internal"); const finalDenoNs = { diff --git a/runtime/ops/web_worker.rs b/runtime/ops/web_worker.rs index 1689b25870..e3ede869da 100644 --- a/runtime/ops/web_worker.rs +++ b/runtime/ops/web_worker.rs @@ -1,41 +1,85 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use crate::web_worker::WebWorkerHandle; +use crate::web_worker::WebWorkerInternalHandle; use crate::web_worker::WorkerEvent; +use deno_core::error::generic_error; use deno_core::error::null_opbuf; -use deno_core::futures::channel::mpsc; +use deno_core::error::AnyError; +use deno_core::op_async; use deno_core::op_sync; use deno_core::Extension; +use deno_core::OpState; use deno_core::ZeroCopyBuf; +use std::cell::RefCell; +use std::rc::Rc; pub fn init() -> Extension { Extension::builder() .ops(vec![ - ( - "op_worker_post_message", - op_sync(move |state, _args: (), buf: Option| { - let buf = buf.ok_or_else(null_opbuf)?; - let msg_buf: Box<[u8]> = (*buf).into(); - let mut sender = state.borrow::>().clone(); - sender - .try_send(WorkerEvent::Message(msg_buf)) - .expect("Failed to post message to host"); - Ok(()) - }), - ), + ("op_worker_post_message", op_sync(op_worker_post_message)), + ("op_worker_get_message", op_async(op_worker_get_message)), // Notify host that guest worker closes. + ("op_worker_close", op_sync(op_worker_close)), + // Notify host that guest worker has unhandled error. ( - "op_worker_close", - op_sync(move |state, _: (), _: ()| { - // Notify parent that we're finished - let mut sender = state.borrow::>().clone(); - sender.close_channel(); - // Terminate execution of current worker - let handle = state.borrow::(); - handle.terminate(); - Ok(()) - }), + "op_worker_unhandled_error", + op_sync(op_worker_unhandled_error), ), ]) .build() } + +fn op_worker_post_message( + state: &mut OpState, + _: (), + buf: Option, +) -> Result<(), AnyError> { + let buf = buf.ok_or_else(null_opbuf)?; + let handle = state.borrow::().clone(); + handle + .post_event(WorkerEvent::Message(buf)) + .expect("Failed to post message to host"); + Ok(()) +} + +async fn op_worker_get_message( + state: Rc>, + _: (), + _: (), +) -> Result { + let temp = { + let a = state.borrow(); + a.borrow::().clone() + }; + + let maybe_data = temp.get_message().await; + + Ok(maybe_data.unwrap_or_else(ZeroCopyBuf::empty)) +} + +#[allow(clippy::unnecessary_wraps)] +fn op_worker_close(state: &mut OpState, _: (), _: ()) -> Result<(), AnyError> { + // Notify parent that we're finished + let mut handle = state.borrow_mut::().clone(); + + handle.terminate(); + Ok(()) +} + +/// A worker that encounters an uncaught error will pass this error +/// to its parent worker using this op. The parent worker will use +/// this same op to pass the error to its own parent (in case +/// `e.preventDefault()` was not called in `worker.onerror`). This +/// is done until the error reaches the root/ main worker. +#[allow(clippy::unnecessary_wraps)] +fn op_worker_unhandled_error( + state: &mut OpState, + message: String, + _: (), +) -> Result<(), AnyError> { + let sender = state.borrow::().clone(); + sender + .post_event(WorkerEvent::Error(generic_error(message))) + .expect("Failed to propagate error event to parent worker"); + Ok(()) +} diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index f8d03850d8..a5698fa6e7 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -15,12 +15,11 @@ use crate::web_worker::run_web_worker; use crate::web_worker::WebWorker; use crate::web_worker::WebWorkerHandle; use crate::web_worker::WorkerEvent; +use crate::web_worker::WorkerId; use deno_core::error::custom_error; -use deno_core::error::generic_error; use deno_core::error::null_opbuf; use deno_core::error::AnyError; use deno_core::error::JsError; -use deno_core::futures::channel::mpsc; use deno_core::op_async; use deno_core::op_sync; use deno_core::serde::de; @@ -28,7 +27,6 @@ use deno_core::serde::de::SeqAccess; use deno_core::serde::Deserialize; use deno_core::serde::Deserializer; use deno_core::serde_json::json; -use deno_core::serde_json::Value; use deno_core::Extension; use deno_core::ModuleSpecifier; use deno_core::OpState; @@ -46,7 +44,7 @@ use std::thread::JoinHandle; pub struct CreateWebWorkerArgs { pub name: String, - pub worker_id: u32, + pub worker_id: WorkerId, pub parent_permissions: Permissions, pub permissions: Permissions, pub main_module: ModuleSpecifier, @@ -68,13 +66,9 @@ pub struct WorkerThread { worker_handle: WebWorkerHandle, } -pub type WorkersTable = HashMap; -pub type WorkerId = u32; +pub type WorkersTable = HashMap; -pub fn init( - is_main_worker: bool, - create_web_worker_cb: Arc, -) -> Extension { +pub fn init(create_web_worker_cb: Arc) -> Extension { Extension::builder() .state(move |state| { state.put::(WorkersTable::default()); @@ -94,20 +88,6 @@ pub fn init( ), ("op_host_post_message", op_sync(op_host_post_message)), ("op_host_get_message", op_async(op_host_get_message)), - ( - "op_host_unhandled_error", - op_sync(move |state, message: String, _: ()| { - if is_main_worker { - return Err(generic_error("Cannot be called from main worker.")); - } - - let mut sender = state.borrow::>().clone(); - sender - .try_send(WorkerEvent::Error(generic_error(message))) - .expect("Failed to propagate error event to parent worker"); - Ok(true) - }), - ), ]) .build() } @@ -473,7 +453,7 @@ fn op_create_worker( let worker_id = state.take::(); let create_module_loader = state.take::(); state.put::(create_module_loader.clone()); - state.put::(worker_id + 1); + state.put::(worker_id.next().unwrap()); let module_specifier = deno_core::resolve_url(&specifier)?; let worker_name = args_name.unwrap_or_else(|| "".to_string()); @@ -483,7 +463,7 @@ fn op_create_worker( // Setup new thread let thread_builder = - std::thread::Builder::new().name(format!("deno-worker-{}", worker_id)); + std::thread::Builder::new().name(format!("{}", worker_id)); // Spawn it let join_handle = thread_builder.spawn(move || { @@ -501,7 +481,7 @@ fn op_create_worker( use_deno_namespace, }); - // Send thread safe handle to newly created worker to host thread + // Send thread safe handle from newly created worker to host thread handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); drop(handle_sender); @@ -512,6 +492,7 @@ fn op_create_worker( run_web_worker(worker, module_specifier, maybe_source_code) })?; + // Receive WebWorkerHandle from newly created worker let worker_handle = handle_receiver.recv().unwrap()?; let worker_thread = WorkerThread { @@ -534,7 +515,7 @@ fn op_host_terminate_worker( id: WorkerId, _: (), ) -> Result<(), AnyError> { - let worker_thread = state + let mut worker_thread = state .borrow_mut::() .remove(&id) .expect("No worker handle found"); @@ -547,54 +528,53 @@ fn op_host_terminate_worker( Ok(()) } -fn serialize_worker_event(event: WorkerEvent) -> Value { - match event { - WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }), - WorkerEvent::TerminalError(error) => match error.downcast::() { - Ok(js_error) => json!({ - "type": "terminalError", - "error": { - "message": js_error.message, - "fileName": js_error.script_resource_name, - "lineNumber": js_error.line_number, - "columnNumber": js_error.start_column, - } - }), - Err(error) => json!({ - "type": "terminalError", - "error": { - "message": error.to_string(), - } - }), - }, - WorkerEvent::Error(error) => match error.downcast::() { - Ok(js_error) => json!({ - "type": "error", - "error": { - "message": js_error.message, - "fileName": js_error.script_resource_name, - "lineNumber": js_error.line_number, - "columnNumber": js_error.start_column, - } - }), - Err(error) => json!({ - "type": "error", - "error": { - "message": error.to_string(), - } - }), - }, +use deno_core::serde::Serialize; +use deno_core::serde::Serializer; + +impl Serialize for WorkerEvent { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let type_id = match &self { + WorkerEvent::Message(_) => 0_i32, + WorkerEvent::TerminalError(_) => 1_i32, + WorkerEvent::Error(_) => 2_i32, + WorkerEvent::Close => 3_i32, + }; + + match self { + WorkerEvent::Message(buf) => { + Serialize::serialize(&(type_id, buf), serializer) + } + WorkerEvent::TerminalError(error) | WorkerEvent::Error(error) => { + let value = match error.downcast_ref::() { + Some(js_error) => json!({ + "message": js_error.message, + "fileName": js_error.script_resource_name, + "lineNumber": js_error.line_number, + "columnNumber": js_error.start_column, + }), + None => json!({ + "message": error.to_string(), + }), + }; + + Serialize::serialize(&(type_id, value), serializer) + } + _ => Serialize::serialize(&(type_id, ()), serializer), + } } } /// Try to remove worker from workers table - NOTE: `Worker.terminate()` /// might have been called already meaning that we won't find worker in /// table - in that case ignore. -fn try_remove_and_close(state: Rc>, id: u32) { +fn try_remove_and_close(state: Rc>, id: WorkerId) { let mut s = state.borrow_mut(); let workers = s.borrow_mut::(); if let Some(mut worker_thread) = workers.remove(&id) { - worker_thread.worker_handle.sender.close_channel(); + worker_thread.worker_handle.terminate(); worker_thread .join_handle .join() @@ -608,7 +588,7 @@ async fn op_host_get_message( state: Rc>, id: WorkerId, _: (), -) -> Result { +) -> Result { let worker_handle = { let s = state.borrow(); let workers_table = s.borrow::(); @@ -617,7 +597,7 @@ async fn op_host_get_message( handle.worker_handle.clone() } else { // If handle was not found it means worker has already shutdown - return Ok(json!({ "type": "close" })); + return Ok(WorkerEvent::Close); } }; @@ -627,12 +607,12 @@ async fn op_host_get_message( if let WorkerEvent::TerminalError(_) = &event { try_remove_and_close(state, id); } - return Ok(serialize_worker_event(event)); + return Ok(event); } // If there was no event from worker it means it has already been closed. try_remove_and_close(state, id); - Ok(json!({ "type": "close" })) + Ok(WorkerEvent::Close) } /// Post message to guest worker as host @@ -641,8 +621,7 @@ fn op_host_post_message( id: WorkerId, data: Option, ) -> Result<(), AnyError> { - let data = data.ok_or_else(null_opbuf)?; - let msg = Vec::from(&*data).into_boxed_slice(); + let msg = data.ok_or_else(null_opbuf)?; debug!("post message to worker {}", id); let worker_thread = state diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 690b6fb58a..5b731a0f51 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -13,7 +13,8 @@ use deno_core::futures::channel::mpsc; use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; use deno_core::futures::stream::StreamExt; -use deno_core::futures::task::AtomicWaker; +use deno_core::serde::Deserialize; +use deno_core::serde::Serialize; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::url::Url; @@ -22,12 +23,16 @@ use deno_core::Extension; use deno_core::GetErrorClassFn; use deno_core::JsErrorCreateFn; use deno_core::JsRuntime; +use deno_core::ModuleId; use deno_core::ModuleLoader; use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; +use deno_core::ZeroCopyBuf; use deno_file::BlobUrlStore; use log::debug; +use std::cell::RefCell; use std::env; +use std::fmt; use std::rc::Rc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -36,38 +41,98 @@ use std::task::Context; use std::task::Poll; use tokio::sync::Mutex as AsyncMutex; +#[derive( + Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, +)] +pub struct WorkerId(u32); +impl fmt::Display for WorkerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "worker-{}", self.0) + } +} +impl WorkerId { + pub fn next(&self) -> Option { + self.0.checked_add(1).map(WorkerId) + } +} + +type WorkerMessage = ZeroCopyBuf; + /// Events that are sent to host from child /// worker. pub enum WorkerEvent { - Message(Box<[u8]>), + Message(WorkerMessage), Error(AnyError), TerminalError(AnyError), + Close, } -pub struct WorkerChannelsInternal { - pub sender: mpsc::Sender, - pub receiver: mpsc::Receiver>, +// Channels used for communication with worker's parent +#[derive(Clone)] +pub struct WebWorkerInternalHandle { + sender: mpsc::Sender, + receiver: Rc>>, + terminated: Arc, + isolate_handle: v8::IsolateHandle, +} + +impl WebWorkerInternalHandle { + /// Post WorkerEvent to parent as a worker + pub fn post_event(&self, event: WorkerEvent) -> Result<(), AnyError> { + let mut sender = self.sender.clone(); + // If the channel is closed, + // the worker must have terminated but the termination message has not yet been received. + // + // Therefore just treat it as if the worker has terminated and return. + if sender.is_closed() { + self.terminated.store(true, Ordering::SeqCst); + return Ok(()); + } + sender.try_send(event)?; + Ok(()) + } + + /// Get the WorkerEvent with lock + /// Panic if more than one listener tries to get event + pub async fn get_message(&self) -> Option { + let mut receiver = self.receiver.borrow_mut(); + receiver.next().await + } + + /// Check if this worker is terminated or being terminated + pub fn is_terminated(&self) -> bool { + self.terminated.load(Ordering::SeqCst) + } + + /// Terminate the worker + /// This function will set terminated to true, terminate the isolate and close the message channel + pub fn terminate(&mut self) { + // This function can be called multiple times by whomever holds + // the handle. However only a single "termination" should occur so + // we need a guard here. + let already_terminated = self.terminated.swap(true, Ordering::SeqCst); + + if !already_terminated { + // Stop javascript execution + self.isolate_handle.terminate_execution(); + } + + // Wake parent by closing the channel + self.sender.close_channel(); + } } -/// Wrapper for `WorkerHandle` that adds functionality -/// for terminating workers. -/// -/// This struct is used by host as well as worker itself. -/// -/// Host uses it to communicate with worker and terminate it, -/// while worker uses it only to finish execution on `self.close()`. #[derive(Clone)] pub struct WebWorkerHandle { - pub sender: mpsc::Sender>, - pub receiver: Arc>>, - terminate_tx: mpsc::Sender<()>, + sender: mpsc::Sender, + receiver: Arc>>, terminated: Arc, isolate_handle: v8::IsolateHandle, } impl WebWorkerHandle { - /// Post message to worker as a host. - pub fn post_message(&self, buf: Box<[u8]>) -> Result<(), AnyError> { + /// Post WorkerMessage to worker as a host + pub fn post_message(&self, buf: WorkerMessage) -> Result<(), AnyError> { let mut sender = self.sender.clone(); // If the channel is closed, // the worker must have terminated but the termination message has not yet been recieved. @@ -81,47 +146,50 @@ impl WebWorkerHandle { Ok(()) } - /// Get the event with lock. + /// Get the WorkerEvent with lock /// Return error if more than one listener tries to get event pub async fn get_event(&self) -> Result, AnyError> { let mut receiver = self.receiver.try_lock()?; Ok(receiver.next().await) } - pub fn terminate(&self) { + /// Terminate the worker + /// This function will set terminated to true, terminate the isolate and close the message channel + pub fn terminate(&mut self) { // This function can be called multiple times by whomever holds // the handle. However only a single "termination" should occur so // we need a guard here. let already_terminated = self.terminated.swap(true, Ordering::SeqCst); if !already_terminated { + // Stop javascript execution self.isolate_handle.terminate_execution(); - let mut sender = self.terminate_tx.clone(); - // This call should be infallible hence the `expect`. - // This might change in the future. - sender.try_send(()).expect("Failed to terminate"); } + + // Wake web worker by closing the channel + self.sender.close_channel(); } } -fn create_channels( +fn create_handles( isolate_handle: v8::IsolateHandle, - terminate_tx: mpsc::Sender<()>, -) -> (WorkerChannelsInternal, WebWorkerHandle) { - let (in_tx, in_rx) = mpsc::channel::>(1); +) -> (WebWorkerInternalHandle, WebWorkerHandle) { + let (in_tx, in_rx) = mpsc::channel::(1); let (out_tx, out_rx) = mpsc::channel::(1); - let internal_channels = WorkerChannelsInternal { + let terminated = Arc::new(AtomicBool::new(false)); + let internal_handle = WebWorkerInternalHandle { sender: out_tx, - receiver: in_rx, + receiver: Rc::new(RefCell::new(in_rx)), + terminated: terminated.clone(), + isolate_handle: isolate_handle.clone(), }; - let external_channels = WebWorkerHandle { + let external_handle = WebWorkerHandle { sender: in_tx, receiver: Arc::new(AsyncMutex::new(out_rx)), - terminated: Arc::new(AtomicBool::new(false)), - terminate_tx, + terminated, isolate_handle, }; - (internal_channels, external_channels) + (internal_handle, external_handle) } /// This struct is an implementation of `Worker` Web API @@ -129,17 +197,12 @@ fn create_channels( /// Each `WebWorker` is either a child of `MainWorker` or other /// `WebWorker`. pub struct WebWorker { - id: u32, + id: WorkerId, inspector: Option>, - // Following fields are pub because they are accessed - // when creating a new WebWorker instance. - pub(crate) internal_channels: WorkerChannelsInternal, pub js_runtime: JsRuntime, pub name: String, - waker: AtomicWaker, - event_loop_idle: bool, - terminate_rx: mpsc::Receiver<()>, - handle: WebWorkerHandle, + internal_handle: WebWorkerInternalHandle, + external_handle: WebWorkerHandle, pub use_deno_namespace: bool, pub main_module: ModuleSpecifier, } @@ -174,7 +237,7 @@ impl WebWorker { name: String, permissions: Permissions, main_module: ModuleSpecifier, - worker_id: u32, + worker_id: WorkerId, options: &WebWorkerOptions, ) -> Self { // Permissions: many ops depend on this @@ -218,7 +281,7 @@ impl WebWorker { let runtime_exts = vec![ ops::web_worker::init(), ops::runtime::init(main_module.clone()), - ops::worker_host::init(false, options.create_web_worker_cb.clone()), + ops::worker_host::init(options.create_web_worker_cb.clone()), ops::io::init(), ]; @@ -264,38 +327,24 @@ impl WebWorker { None }; - let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1); - let isolate_handle = js_runtime.v8_isolate().thread_safe_handle(); - let (internal_channels, handle) = - create_channels(isolate_handle, terminate_tx); - - let mut worker = Self { - id: worker_id, - inspector, - internal_channels, - js_runtime, - name, - waker: AtomicWaker::new(), - event_loop_idle: false, - terminate_rx, - handle, - use_deno_namespace: options.use_deno_namespace, - main_module, - }; - - // Setup worker-dependant OpState and return worker - { - let handle = worker.thread_safe_handle(); - let sender = worker.internal_channels.sender.clone(); - let js_runtime = &mut worker.js_runtime; + let (internal_handle, external_handle) = { + let handle = js_runtime.v8_isolate().thread_safe_handle(); + let (internal_handle, external_handle) = create_handles(handle); let op_state = js_runtime.op_state(); let mut op_state = op_state.borrow_mut(); + op_state.put(internal_handle.clone()); + (internal_handle, external_handle) + }; - // Required by runtime::ops::worker_host/web_worker - op_state.put(handle); - op_state.put(sender); - - worker + Self { + id: worker_id, + inspector, + js_runtime, + name, + internal_handle, + external_handle, + use_deno_namespace: options.use_deno_namespace, + main_module, } } @@ -321,7 +370,7 @@ impl WebWorker { // Instead of using name for log we use `worker-${id}` because // WebWorkers can have empty string as name. let script = format!( - "bootstrap.workerRuntime({}, \"{}\", {}, \"worker-{}\")", + "bootstrap.workerRuntime({}, \"{}\", {}, \"{}\")", runtime_options_str, self.name, options.use_deno_namespace, self.id ); self @@ -338,12 +387,20 @@ impl WebWorker { self.js_runtime.execute(url.as_str(), js_source) } + /// Loads and instantiates specified JavaScript module. + pub async fn preload_module( + &mut self, + module_specifier: &ModuleSpecifier, + ) -> Result { + self.js_runtime.load_module(module_specifier, None).await + } + /// Loads, instantiates and executes specified JavaScript module. pub async fn execute_module( &mut self, module_specifier: &ModuleSpecifier, ) -> Result<(), AnyError> { - let id = self.js_runtime.load_module(module_specifier, None).await?; + let id = self.preload_module(module_specifier).await?; let mut receiver = self.js_runtime.mod_evaluate(id); tokio::select! { @@ -357,8 +414,8 @@ impl WebWorker { } event_loop_result = self.run_event_loop() => { - if self.has_been_terminated() { - return Ok(()); + if self.internal_handle.is_terminated() { + return Ok(()); } event_loop_result?; let maybe_result = receiver.next().await; @@ -370,82 +427,44 @@ impl WebWorker { /// Returns a way to communicate with the Worker from other threads. pub fn thread_safe_handle(&self) -> WebWorkerHandle { - self.handle.clone() - } - - pub fn has_been_terminated(&self) -> bool { - self.handle.terminated.load(Ordering::SeqCst) + self.external_handle.clone() } pub fn poll_event_loop( &mut self, cx: &mut Context, ) -> Poll> { - if self.has_been_terminated() { + // If awakened because we are terminating, just return Ok + if self.internal_handle.is_terminated() { return Poll::Ready(Ok(())); } - if !self.event_loop_idle { - let poll_result = { - // We always poll the inspector if it exists. - let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); - self.waker.register(cx.waker()); - self.js_runtime.poll_event_loop(cx) - }; - - if let Poll::Ready(r) = poll_result { - if self.has_been_terminated() { + // We always poll the inspector if it exists. + let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); + match self.js_runtime.poll_event_loop(cx) { + Poll::Ready(r) => { + // If js ended because we are terminating, just return Ok + if self.internal_handle.is_terminated() { return Poll::Ready(Ok(())); } + // In case of an error, pass to parent without terminating worker if let Err(e) = r { print_worker_error(e.to_string(), &self.name); - let mut sender = self.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) + let handle = self.internal_handle.clone(); + handle + .post_event(WorkerEvent::Error(e)) .expect("Failed to post message to host"); - } - self.event_loop_idle = true; - } - } - if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) { - // terminate_rx should never be closed - assert!(r.is_some()); - return Poll::Ready(Ok(())); - } - - let maybe_msg_poll_result = - self.internal_channels.receiver.poll_next_unpin(cx); - - if let Poll::Ready(maybe_msg) = maybe_msg_poll_result { - let msg = - maybe_msg.expect("Received `None` instead of message in worker"); - let msg = String::from_utf8(msg.to_vec()).unwrap(); - let script = format!("workerMessageRecvCallback({})", msg); - - // TODO(bartlomieju): set proper script name like "deno:runtime/web_worker.js" - // so it's dimmed in stack trace instead of using "__anonymous__" - if let Err(e) = self.execute(&script) { - // If execution was terminated during message callback then - // just ignore it - if self.has_been_terminated() { - return Poll::Ready(Ok(())); + return Poll::Pending; } - // Otherwise forward error to host - let mut sender = self.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); + panic!( + "coding error: either js is polling or the worker is terminated" + ); } - - // Let event loop be polled again - self.event_loop_idle = false; - self.waker.wake(); + Poll::Pending => Poll::Pending, } - - Poll::Pending } pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { @@ -495,18 +514,18 @@ pub fn run_web_worker( rt.block_on(load_future) }; - let mut sender = worker.internal_channels.sender.clone(); + let internal_handle = worker.internal_handle.clone(); // If sender is closed it means that worker has already been closed from // within using "globalThis.close()" - if sender.is_closed() { + if internal_handle.is_terminated() { return Ok(()); } if let Err(e) = result { print_worker_error(e.to_string(), &name); - sender - .try_send(WorkerEvent::TerminalError(e)) + internal_handle + .post_event(WorkerEvent::TerminalError(e)) .expect("Failed to post message to host"); // Failure to execute script is a terminal error, bye, bye. @@ -522,7 +541,6 @@ pub fn run_web_worker( mod tests { use super::*; use crate::tokio_util; - use deno_core::serde_json::json; fn create_test_web_worker() -> WebWorker { let main_module = deno_core::resolve_url_or_path("./hello.js").unwrap(); @@ -554,7 +572,7 @@ mod tests { "TEST".to_string(), Permissions::allow_all(), main_module, - 1, + WorkerId(1), &options, ); worker.bootstrap(&options); @@ -589,30 +607,30 @@ mod tests { let mut handle = handle_receiver.recv().unwrap(); - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()); + // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value + let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded + let r = handle.post_message(msg.clone().into()); assert!(r.is_ok()); let maybe_msg = handle.get_event().await.unwrap(); assert!(maybe_msg.is_some()); - let r = handle.post_message(msg.clone()); + let r = handle.post_message(msg.clone().into()); assert!(r.is_ok()); let maybe_msg = handle.get_event().await.unwrap(); assert!(maybe_msg.is_some()); match maybe_msg { Some(WorkerEvent::Message(buf)) => { - assert_eq!(*buf, *b"[1,2,3]"); + // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value + assert_eq!(*buf, [65, 3, 73, 2, 73, 4, 73, 6, 36, 0, 3]); } _ => unreachable!(), } - let msg = json!("exit") - .to_string() - .into_boxed_str() - .into_boxed_bytes(); - let r = handle.post_message(msg); + // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value + let msg = vec![34, 4, 101, 120, 105, 116].into_boxed_slice(); // "exit" encoded + let r = handle.post_message(msg.into()); assert!(r.is_ok()); let event = handle.get_event().await.unwrap(); assert!(event.is_none()); @@ -636,8 +654,9 @@ mod tests { let mut handle = handle_receiver.recv().unwrap(); - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()); + // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value + let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded + let r = handle.post_message(msg.clone().into()); assert!(r.is_ok()); let event = handle.get_event().await.unwrap(); assert!(event.is_none()); diff --git a/runtime/worker.rs b/runtime/worker.rs index ab54e2153b..c75f09dc81 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -113,7 +113,7 @@ impl MainWorker { metrics::init(), // Runtime ops ops::runtime::init(main_module), - ops::worker_host::init(true, options.create_web_worker_cb.clone()), + ops::worker_host::init(options.create_web_worker_cb.clone()), ops::fs_events::init(), ops::fs::init(), ops::http::init(), diff --git a/serde_v8/src/magic/buffer.rs b/serde_v8/src/magic/buffer.rs index 893bf35e1c..1fcfffc723 100644 --- a/serde_v8/src/magic/buffer.rs +++ b/serde_v8/src/magic/buffer.rs @@ -1,9 +1,9 @@ use rusty_v8 as v8; -use std::cell::Cell; use std::fmt; use std::ops::Deref; use std::ops::DerefMut; +use std::sync::Mutex; use super::zero_copy_buf::ZeroCopyBuf; @@ -11,7 +11,7 @@ use super::zero_copy_buf::ZeroCopyBuf; // allowing us to use a single type for familiarity pub enum MagicBuffer { FromV8(ZeroCopyBuf), - ToV8(Cell>>), + ToV8(Mutex>>), } impl MagicBuffer { @@ -21,6 +21,10 @@ impl MagicBuffer { ) -> Self { Self::FromV8(ZeroCopyBuf::new(scope, view)) } + + pub fn empty() -> Self { + MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice()))) + } } impl Clone for MagicBuffer { @@ -65,7 +69,7 @@ impl DerefMut for MagicBuffer { impl From> for MagicBuffer { fn from(buf: Box<[u8]>) -> Self { - MagicBuffer::ToV8(Cell::new(Some(buf))) + MagicBuffer::ToV8(Mutex::new(Some(buf))) } } @@ -88,8 +92,11 @@ impl serde::Serialize for MagicBuffer { let mut s = serializer.serialize_struct(BUF_NAME, 1)?; let boxed: Box<[u8]> = match self { - Self::FromV8(_) => unreachable!(), - Self::ToV8(x) => x.take().expect("MagicBuffer was empty"), + Self::FromV8(buf) => { + let value: &[u8] = &buf; + value.into() + } + Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"), }; let hack: [usize; 2] = unsafe { std::mem::transmute(boxed) }; let f1: u64 = hack[0] as u64;