diff --git a/cli/compilers/compiler_worker.rs b/cli/compilers/compiler_worker.rs index 3252aae029..123c29abb9 100644 --- a/cli/compilers/compiler_worker.rs +++ b/cli/compilers/compiler_worker.rs @@ -30,7 +30,7 @@ impl CompilerWorker { let isolate = &mut worker.isolate; ops::runtime::init(isolate, &state); ops::compiler::init(isolate, &state); - ops::web_worker::init(isolate, &state); + ops::web_worker::init(isolate, &state, &worker.internal_channels.sender); ops::errors::init(isolate, &state); // for compatibility with Worker scope, though unused at // the moment diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index 91c2643450..d1fb3a6ace 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -8,11 +8,15 @@ use crate::file_fetcher::SourceFile; use crate::file_fetcher::SourceFileFetcher; use crate::global_state::GlobalState; use crate::msg; +use crate::ops::worker_host::run_worker_loop; use crate::ops::JsonResult; use crate::source_maps::SourceMapGetter; use crate::startup_data; use crate::state::*; +use crate::tokio_util::create_basic_runtime; use crate::version; +use crate::worker::WorkerEvent; +use crate::worker::WorkerHandle; use deno_core::Buf; use deno_core::ErrBox; use deno_core::ModuleSpecifier; @@ -288,13 +292,11 @@ impl TsCompiler { true, ); - let maybe_msg = execute_in_thread(global_state.clone(), req_msg).await?; - if let Some(ref msg) = maybe_msg { - let json_str = std::str::from_utf8(msg).unwrap(); - debug!("Message: {}", json_str); - if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { - return Err(ErrBox::from(diagnostics)); - } + let msg = execute_in_thread(global_state.clone(), req_msg).await?; + let json_str = std::str::from_utf8(&msg).unwrap(); + debug!("Message: {}", json_str); + if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { + return Err(ErrBox::from(diagnostics)); } Ok(()) } @@ -376,13 +378,11 @@ impl TsCompiler { let compiling_job = global_state .progress .add("Compile", &module_url.to_string()); - let maybe_msg = execute_in_thread(global_state.clone(), req_msg).await?; + let msg = execute_in_thread(global_state.clone(), req_msg).await?; - if let Some(ref msg) = maybe_msg { - let json_str = std::str::from_utf8(msg).unwrap(); - if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { - return Err(ErrBox::from(diagnostics)); - } + let json_str = std::str::from_utf8(&msg).unwrap(); + if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { + return Err(ErrBox::from(diagnostics)); } let compiled_module = ts_compiler.get_compiled_module(&source_file_.url)?; drop(compiling_job); @@ -602,45 +602,45 @@ impl TsCompiler { } } +// TODO(bartlomieju): exactly same function is in `wasm.rs` - only difference +// it created WasmCompiler instead of TsCompiler - deduplicate async fn execute_in_thread( global_state: GlobalState, req: Buf, -) -> Result, ErrBox> { - let (load_sender, load_receiver) = - tokio::sync::oneshot::channel::, ErrBox>>(); - std::thread::spawn(move || { - debug!(">>>>> compile_async START"); - +) -> Result { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::>(1); + let builder = + std::thread::Builder::new().name("deno-ts-compiler".to_string()); + let join_handle = builder.spawn(move || { let mut worker = TsCompiler::setup_worker(global_state.clone()); - let handle = worker.thread_safe_handle(); - - crate::tokio_util::run_basic( - async move { - if let Err(err) = handle.post_message(req).await { - load_sender.send(Err(err)).unwrap(); - return; - } - if let Err(err) = (&mut *worker).await { - load_sender.send(Err(err)).unwrap(); - return; - } - let maybe_msg = handle.get_message().await; - load_sender.send(Ok(maybe_msg)).unwrap(); - debug!(">>>>> compile_sync END"); - } - .boxed_local(), - ); - }); - - load_receiver.await.unwrap() + handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); + drop(handle_sender); + let mut rt = create_basic_runtime(); + run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop"); + })?; + let mut handle = handle_receiver.recv().unwrap()?; + handle.post_message(req).await?; + let event = handle.get_event().await.expect("Compiler didn't respond"); + let buf = match event { + WorkerEvent::Message(buf) => Ok(buf), + WorkerEvent::Error(error) => Err(error), + }?; + // Compiler worker finishes after one request + // so we should receive signal that channel was closed. + // Then close worker's channel and join the thread. + let event = handle.get_event().await; + assert!(event.is_none()); + handle.sender.close_channel(); + join_handle.join().unwrap(); + Ok(buf) } async fn execute_in_thread_json( req_msg: Buf, global_state: GlobalState, ) -> JsonResult { - let maybe_msg = execute_in_thread(global_state, req_msg).await?; - let msg = maybe_msg.unwrap(); + let msg = execute_in_thread(global_state, req_msg).await?; let json_str = std::str::from_utf8(&msg).unwrap(); Ok(json!(json_str)) } diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index c1c179f623..9bc9d2ab4d 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -3,8 +3,13 @@ use super::compiler_worker::CompilerWorker; use crate::compilers::CompiledModule; use crate::file_fetcher::SourceFile; use crate::global_state::GlobalState; +use crate::ops::worker_host::run_worker_loop; use crate::startup_data; use crate::state::*; +use crate::tokio_util::create_basic_runtime; +use crate::worker::WorkerEvent; +use crate::worker::WorkerHandle; +use deno_core::Buf; use deno_core::ErrBox; use deno_core::ModuleSpecifier; use serde_derive::Deserialize; @@ -83,64 +88,67 @@ impl WasmCompiler { if let Some(m) = maybe_cached { return Ok(m); } - - let (load_sender, load_receiver) = - tokio::sync::oneshot::channel::>(); - - std::thread::spawn(move || { - debug!(">>>>> wasm_compile_async START"); - let base64_data = base64::encode(&source_file.source_code); - let mut worker = WasmCompiler::setup_worker(global_state); - let handle = worker.thread_safe_handle(); - let url = source_file.url.clone(); - - let fut = async move { - let _ = handle - .post_message( - serde_json::to_string(&base64_data) - .unwrap() - .into_boxed_str() - .into_boxed_bytes(), - ) - .await; - - if let Err(err) = (&mut *worker).await { - load_sender.send(Err(err)).unwrap(); - return; - } - - debug!("Sent message to worker"); - let json_msg = handle.get_message().await.expect("not handled"); - - debug!("Received message from worker"); - let module_info: WasmModuleInfo = - serde_json::from_slice(&json_msg).unwrap(); - - debug!("WASM module info: {:#?}", &module_info); - let code = wrap_wasm_code( - &base64_data, - &module_info.import_list, - &module_info.export_list, - ); - - debug!("Generated code: {}", &code); - let module = CompiledModule { - code, - name: url.to_string(), - }; - { - cache_.lock().unwrap().insert(url.clone(), module.clone()); - } - debug!("<<<<< wasm_compile_async END"); - load_sender.send(Ok(module)).unwrap(); - }; - - crate::tokio_util::run_basic(fut); - }); - load_receiver.await.unwrap() + debug!(">>>>> wasm_compile_async START"); + let base64_data = base64::encode(&source_file.source_code); + let url = source_file.url.clone(); + let req_msg = serde_json::to_string(&base64_data) + .unwrap() + .into_boxed_str() + .into_boxed_bytes(); + let msg = execute_in_thread(global_state.clone(), req_msg).await?; + debug!("Received message from worker"); + let module_info: WasmModuleInfo = serde_json::from_slice(&msg).unwrap(); + debug!("WASM module info: {:#?}", &module_info); + let code = wrap_wasm_code( + &base64_data, + &module_info.import_list, + &module_info.export_list, + ); + debug!("Generated code: {}", &code); + let module = CompiledModule { + code, + name: url.to_string(), + }; + { + cache_.lock().unwrap().insert(url.clone(), module.clone()); + } + debug!("<<<<< wasm_compile_async END"); + Ok(module) } } +async fn execute_in_thread( + global_state: GlobalState, + req: Buf, +) -> Result { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::>(1); + let builder = + std::thread::Builder::new().name("deno-wasm-compiler".to_string()); + let join_handle = builder.spawn(move || { + let mut worker = WasmCompiler::setup_worker(global_state); + handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); + drop(handle_sender); + let mut rt = create_basic_runtime(); + run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop"); + })?; + let mut handle = handle_receiver.recv().unwrap()?; + handle.post_message(req).await?; + let event = handle.get_event().await.expect("Compiler didn't respond"); + let buf = match event { + WorkerEvent::Message(buf) => Ok(buf), + WorkerEvent::Error(error) => Err(error), + }?; + // Compiler worker finishes after one request + // so we should receive signal that channel was closed. + // Then close worker's channel and join the thread. + let event = handle.get_event().await; + assert!(event.is_none()); + handle.sender.close_channel(); + join_handle.join().unwrap(); + Ok(buf) +} + fn build_single_import(index: usize, origin: &str) -> String { let origin_json = serde_json::to_string(origin).unwrap(); format!( diff --git a/cli/js/compiler.ts b/cli/js/compiler.ts index bf0287efe4..4ca2887c61 100644 --- a/cli/js/compiler.ts +++ b/cli/js/compiler.ts @@ -40,10 +40,7 @@ import { Diagnostic } from "./diagnostics.ts"; import { fromTypeScriptDiagnostic } from "./diagnostics_util.ts"; import { assert } from "./util.ts"; import * as util from "./util.ts"; -import { - bootstrapWorkerRuntime, - runWorkerMessageLoop -} from "./runtime_worker.ts"; +import { bootstrapWorkerRuntime } from "./runtime_worker.ts"; interface CompilerRequestCompile { type: CompilerRequestType.Compile; @@ -340,13 +337,11 @@ async function wasmCompilerOnMessage({ function bootstrapTsCompilerRuntime(): void { bootstrapWorkerRuntime("TS"); globalThis.onmessage = tsCompilerOnMessage; - runWorkerMessageLoop(); } function bootstrapWasmCompilerRuntime(): void { bootstrapWorkerRuntime("WASM"); globalThis.onmessage = wasmCompilerOnMessage; - runWorkerMessageLoop(); } Object.defineProperties(globalThis, { diff --git a/cli/js/dispatch.ts b/cli/js/dispatch.ts index 1a6b6528db..4493d37710 100644 --- a/cli/js/dispatch.ts +++ b/cli/js/dispatch.ts @@ -43,10 +43,10 @@ export let OP_REVOKE_PERMISSION: number; export let OP_REQUEST_PERMISSION: number; export let OP_CREATE_WORKER: number; export let OP_HOST_POST_MESSAGE: number; -export let OP_HOST_CLOSE_WORKER: number; +export let OP_HOST_TERMINATE_WORKER: number; export let OP_HOST_GET_MESSAGE: number; export let OP_WORKER_POST_MESSAGE: number; -export let OP_WORKER_GET_MESSAGE: number; +export let OP_WORKER_CLOSE: number; export let OP_RUN: number; export let OP_RUN_STATUS: number; export let OP_KILL: number; diff --git a/cli/js/globals.ts b/cli/js/globals.ts index 7cce739d51..53eb696ac0 100644 --- a/cli/js/globals.ts +++ b/cli/js/globals.ts @@ -118,7 +118,6 @@ declare global { var bootstrapWorkerRuntime: | ((name: string) => Promise | void) | undefined; - var runWorkerMessageLoop: (() => Promise | void) | undefined; var onerror: | (( msg: string, diff --git a/cli/js/lib.deno.worker.d.ts b/cli/js/lib.deno.worker.d.ts index 07955345c8..3311d9457c 100644 --- a/cli/js/lib.deno.worker.d.ts +++ b/cli/js/lib.deno.worker.d.ts @@ -37,7 +37,6 @@ declare const postMessage: typeof __workerMain.postMessage; declare namespace __workerMain { export let onmessage: (e: { data: any }) => void; export function postMessage(data: any): void; - export function getMessage(): Promise; export function close(): void; export const name: string; } diff --git a/cli/js/main.ts b/cli/js/main.ts index b48277960e..fbebfefe42 100644 --- a/cli/js/main.ts +++ b/cli/js/main.ts @@ -1,9 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. import { bootstrapMainRuntime } from "./runtime_main.ts"; -import { - bootstrapWorkerRuntime, - runWorkerMessageLoop -} from "./runtime_worker.ts"; +import { bootstrapWorkerRuntime } from "./runtime_worker.ts"; Object.defineProperties(globalThis, { bootstrapMainRuntime: { @@ -17,11 +14,5 @@ Object.defineProperties(globalThis, { enumerable: false, writable: false, configurable: false - }, - runWorkerMessageLoop: { - value: runWorkerMessageLoop, - enumerable: false, - writable: false, - configurable: false } }); diff --git a/cli/js/runtime_worker.ts b/cli/js/runtime_worker.ts index 0dc65fdb62..a9ed8b9249 100644 --- a/cli/js/runtime_worker.ts +++ b/cli/js/runtime_worker.ts @@ -3,12 +3,9 @@ // This module is the entry point for "worker" isolate, ie. the one // that is created using `new Worker()` JS API. // -// It provides two functions that should be called by Rust: +// It provides a single function that should be called by Rust: // - `bootstrapWorkerRuntime` - must be called once, when Isolate is created. // It sets up runtime by providing globals for `DedicatedWorkerScope`. -// - `runWorkerMessageLoop` - starts receiving messages from parent worker, -// can be called multiple times - eg. to restart worker execution after -// exception occurred and was handled by parent worker /* eslint-disable @typescript-eslint/no-explicit-any */ import { @@ -20,13 +17,12 @@ import { eventTargetProperties } from "./globals.ts"; import * as dispatch from "./dispatch.ts"; -import { sendAsync, sendSync } from "./dispatch_json.ts"; +import { sendSync } from "./dispatch_json.ts"; import { log } from "./util.ts"; -import { TextDecoder, TextEncoder } from "./text_encoding.ts"; +import { TextEncoder } from "./text_encoding.ts"; import * as runtime from "./runtime.ts"; const encoder = new TextEncoder(); -const decoder = new TextDecoder(); // TODO(bartlomieju): remove these funtions // Stuff for workers @@ -39,62 +35,46 @@ export function postMessage(data: any): void { sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray); } -export async function getMessage(): Promise { - log("getMessage"); - const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE); - if (res.data != null) { - const dataIntArray = new Uint8Array(res.data); - const dataJson = decoder.decode(dataIntArray); - return JSON.parse(dataJson); - } else { - return null; - } -} - let isClosing = false; let hasBootstrapped = false; export function close(): void { + if (isClosing) { + return; + } + isClosing = true; + sendSync(dispatch.OP_WORKER_CLOSE); } -export async function runWorkerMessageLoop(): Promise { - while (!isClosing) { - const data = await getMessage(); - if (data == null) { - log("runWorkerMessageLoop got null message. quitting."); - break; - } +export async function workerMessageRecvCallback(data: string): Promise { + let result: void | Promise; + const event = { data }; - let result: void | Promise; - const event = { data }; - - try { - if (!globalThis["onmessage"]) { - break; - } + try { + // + if (globalThis["onmessage"]) { result = globalThis.onmessage!(event); if (result && "then" in result) { await result; } - if (!globalThis["onmessage"]) { - break; - } - } catch (e) { - if (globalThis["onerror"]) { - const result = globalThis.onerror( - e.message, - e.fileName, - e.lineNumber, - e.columnNumber, - e - ); - if (result === true) { - continue; - } - } - throw e; } + + // TODO: run the rest of liteners + } catch (e) { + if (globalThis["onerror"]) { + const result = globalThis.onerror( + e.message, + e.fileName, + e.lineNumber, + e.columnNumber, + e + ); + if (result === true) { + return; + } + } + throw e; } } @@ -102,8 +82,10 @@ export const workerRuntimeGlobalProperties = { self: readOnly(globalThis), onmessage: writable(onmessage), onerror: writable(onerror), + // TODO: should be readonly? close: nonEnumerable(close), - postMessage: writable(postMessage) + postMessage: writable(postMessage), + workerMessageRecvCallback: nonEnumerable(workerMessageRecvCallback) }; /** diff --git a/cli/js/unit_tests.ts b/cli/js/unit_tests.ts index 992169e55c..a6435d1831 100644 --- a/cli/js/unit_tests.ts +++ b/cli/js/unit_tests.ts @@ -59,6 +59,7 @@ import "./write_file_test.ts"; import "./performance_test.ts"; import "./permissions_test.ts"; import "./version_test.ts"; +import "./workers_test.ts"; import { runIfMain } from "../../std/testing/mod.ts"; diff --git a/cli/js/workers.ts b/cli/js/workers.ts index fb63a3260b..7b0c503365 100644 --- a/cli/js/workers.ts +++ b/cli/js/workers.ts @@ -38,19 +38,23 @@ function createWorker( }); } +function hostTerminateWorker(id: number): void { + sendSync(dispatch.OP_HOST_TERMINATE_WORKER, { id }); +} + function hostPostMessage(id: number, data: any): void { const dataIntArray = encodeMessage(data); sendSync(dispatch.OP_HOST_POST_MESSAGE, { id }, dataIntArray); } -async function hostGetMessage(id: number): Promise { - const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id }); +interface WorkerEvent { + event: "error" | "msg" | "close"; + data?: any; + error?: any; +} - if (res.data != null) { - return decodeMessage(new Uint8Array(res.data)); - } else { - return null; - } +async function hostGetMessage(id: number): Promise { + return await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id }); } export interface Worker { @@ -72,6 +76,8 @@ export class WorkerImpl extends EventTarget implements Worker { public onerror?: (e: any) => void; public onmessage?: (data: any) => void; public onmessageerror?: () => void; + private name: string; + private terminated = false; constructor(specifier: string, options?: WorkerOptions) { super(); @@ -88,6 +94,7 @@ export class WorkerImpl extends EventTarget implements Worker { ); } + this.name = options?.name ?? "unknown"; const hasSourceCode = false; const sourceCode = new Uint8Array(); @@ -139,42 +146,53 @@ export class WorkerImpl extends EventTarget implements Worker { } async poll(): Promise { - while (!this.isClosing) { - const data = await hostGetMessage(this.id); - if (data == null) { - log("worker got null message. quitting."); - break; - } - if (this.onmessage) { - const event = { data }; - this.onmessage(event); - } - } + while (!this.terminated) { + const event = await hostGetMessage(this.id); - /* - while (true) { - const result = await hostPollWorker(this.id); + // If terminate was called then we ignore all messages + if (this.terminated) { + return; + } - if (result.error) { - if (!this.handleError(result.error)) { - throw Error(result.error.message); - } else { - hostResumeWorker(this.id); + const type = event.type; + + if (type === "msg") { + if (this.onmessage) { + const message = decodeMessage(new Uint8Array(event.data)); + this.onmessage({ data: message }); } - } else { - this.isClosing = true; - hostCloseWorker(this.id); - break; + continue; } + + if (type === "error") { + if (!this.handleError(event.error)) { + throw Error(event.error.message); + } + continue; + } + + if (type === "close") { + log(`Host got "close" message from worker: ${this.name}`); + this.terminated = true; + return; + } + + throw new Error(`Unknown worker event: "${type}"`); } - */ } postMessage(data: any): void { + if (this.terminated) { + return; + } + hostPostMessage(this.id, data); } terminate(): void { - throw new Error("Not yet implemented"); + if (!this.terminated) { + this.terminated = true; + hostTerminateWorker(this.id); + } } } diff --git a/cli/js/workers_test.ts b/cli/js/workers_test.ts new file mode 100644 index 0000000000..9cb4f4a07e --- /dev/null +++ b/cli/js/workers_test.ts @@ -0,0 +1,84 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +import { test, assert, assertEquals } from "./test_util.ts"; + +export interface ResolvableMethods { + resolve: (value?: T | PromiseLike) => void; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + reject: (reason?: any) => void; +} + +export type Resolvable = Promise & ResolvableMethods; + +export function createResolvable(): Resolvable { + let methods: ResolvableMethods; + const promise = new Promise((resolve, reject): void => { + methods = { resolve, reject }; + }); + // TypeScript doesn't know that the Promise callback occurs synchronously + // therefore use of not null assertion (`!`) + return Object.assign(promise, methods!) as Resolvable; +} + +test(async function workersBasic(): Promise { + const promise = createResolvable(); + const jsWorker = new Worker("../tests/subdir/test_worker.js", { + type: "module", + name: "jsWorker" + }); + const tsWorker = new Worker("../tests/subdir/test_worker.ts", { + type: "module", + name: "tsWorker" + }); + + tsWorker.onmessage = (e): void => { + assertEquals(e.data, "Hello World"); + promise.resolve(); + }; + + jsWorker.onmessage = (e): void => { + assertEquals(e.data, "Hello World"); + tsWorker.postMessage("Hello World"); + }; + + jsWorker.onerror = (e: Event): void => { + e.preventDefault(); + jsWorker.postMessage("Hello World"); + }; + + jsWorker.postMessage("Hello World"); + await promise; +}); + +test(async function nestedWorker(): Promise { + const promise = createResolvable(); + + const nestedWorker = new Worker("../tests/subdir/nested_worker.js", { + type: "module", + name: "nested" + }); + + nestedWorker.onmessage = (e): void => { + assert(e.data.type !== "error"); + promise.resolve(); + }; + + nestedWorker.postMessage("Hello World"); + await promise; +}); + +test(async function workerThrowsWhenExecuting(): Promise { + const promise = createResolvable(); + + const throwingWorker = new Worker("../tests/subdir/throwing_worker.js", { + type: "module" + }); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + throwingWorker.onerror = (e: any): void => { + e.preventDefault(); + assertEquals(e.message, "Uncaught Error: Thrown error"); + promise.resolve(); + }; + + await promise; +}); diff --git a/cli/ops/runtime.rs b/cli/ops/runtime.rs index a962f4e830..7773e461c7 100644 --- a/cli/ops/runtime.rs +++ b/cli/ops/runtime.rs @@ -8,6 +8,7 @@ use crate::version; use crate::DenoSubcommand; use deno_core::*; use std::env; +use std::sync::atomic::Ordering; /// BUILD_OS and BUILD_ARCH match the values in Deno.build. See js/build.ts. #[cfg(target_os = "macos")] @@ -21,6 +22,7 @@ static BUILD_ARCH: &str = "x64"; pub fn init(i: &mut Isolate, s: &State) { i.register_op("start", s.core_op(json_op(s.stateful_op(op_start)))); + i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics)))); } fn op_start( @@ -47,3 +49,20 @@ fn op_start( "arch": BUILD_ARCH, }))) } + +fn op_metrics( + state: &State, + _args: Value, + _zero_copy: Option, +) -> Result { + let state = state.borrow(); + let m = &state.metrics; + + Ok(JsonOp::Sync(json!({ + "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64, + "opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64, + "bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64, + "bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64, + "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64 + }))) +} diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs index ae6b10abc0..e22c0f2217 100644 --- a/cli/ops/web_worker.rs +++ b/cli/ops/web_worker.rs @@ -1,65 +1,65 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{JsonOp, Value}; -use crate::deno_error::DenoError; -use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::state::State; +use crate::worker::WorkerEvent; use deno_core::*; use futures; -use futures::future::FutureExt; +use futures::channel::mpsc; +use futures::sink::SinkExt; use std; use std::convert::From; -pub fn init(i: &mut Isolate, s: &State) { - i.register_op( - "worker_post_message", - s.core_op(json_op(s.stateful_op(op_worker_post_message))), - ); - i.register_op( - "worker_get_message", - s.core_op(json_op(s.stateful_op(op_worker_get_message))), - ); +pub fn web_worker_op( + sender: mpsc::Sender, + dispatcher: D, +) -> impl Fn(Value, Option) -> Result +where + D: Fn( + &mpsc::Sender, + Value, + Option, + ) -> Result, +{ + move |args: Value, zero_copy: Option| -> Result { + dispatcher(&sender, args, zero_copy) + } } -/// Get message from host as guest worker -fn op_worker_get_message( - state: &State, - _args: Value, - _data: Option, -) -> Result { - let state_ = state.clone(); - let op = async move { - let fut = { - let state = state_.borrow(); - state - .worker_channels_internal - .as_ref() - .unwrap() - .get_message() - }; - let maybe_buf = fut.await; - debug!("op_worker_get_message"); - Ok(json!({ "data": maybe_buf })) - }; - - Ok(JsonOp::Async(op.boxed_local())) +pub fn init(i: &mut Isolate, s: &State, sender: &mpsc::Sender) { + i.register_op( + "worker_post_message", + s.core_op(json_op(web_worker_op( + sender.clone(), + op_worker_post_message, + ))), + ); + i.register_op( + "worker_close", + s.core_op(json_op(web_worker_op(sender.clone(), op_worker_close))), + ); } /// Post message to host as guest worker fn op_worker_post_message( - state: &State, + sender: &mpsc::Sender, _args: Value, data: Option, ) -> Result { let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - let state = state.borrow(); - let fut = state - .worker_channels_internal - .as_ref() - .unwrap() - .post_message(d); - futures::executor::block_on(fut) - .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - + let mut sender = sender.clone(); + let fut = sender.send(WorkerEvent::Message(d)); + futures::executor::block_on(fut).expect("Failed to post message to host"); + Ok(JsonOp::Sync(json!({}))) +} + +/// Notify host that guest worker closes +fn op_worker_close( + sender: &mpsc::Sender, + _args: Value, + _data: Option, +) -> Result { + let mut sender = sender.clone(); + sender.close_channel(); Ok(JsonOp::Sync(json!({}))) } diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index fabe0b5e83..4f6f996ee5 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -1,21 +1,29 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; -use crate::deno_error::bad_resource; -use crate::deno_error::js_check; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; +use crate::deno_error::GetErrorKind; +use crate::fmt_errors::JSError; +use crate::futures::SinkExt; +use crate::global_state::GlobalState; use crate::ops::json_op; +use crate::permissions::DenoPermissions; use crate::startup_data; use crate::state::State; +use crate::tokio_util::create_basic_runtime; use crate::web_worker::WebWorker; -use crate::worker::WorkerChannelsExternal; +use crate::worker::Worker; +use crate::worker::WorkerEvent; +use crate::worker::WorkerHandle; use deno_core::*; use futures; +use futures::future::poll_fn; use futures::future::FutureExt; use futures::future::TryFutureExt; +use futures::stream::StreamExt; use std; use std::convert::From; -use std::sync::atomic::Ordering; +use std::task::Poll; pub fn init(i: &mut Isolate, s: &State) { i.register_op( @@ -23,8 +31,8 @@ pub fn init(i: &mut Isolate, s: &State) { s.core_op(json_op(s.stateful_op(op_create_worker))), ); i.register_op( - "host_close_worker", - s.core_op(json_op(s.stateful_op(op_host_close_worker))), + "host_terminate_worker", + s.core_op(json_op(s.stateful_op(op_host_terminate_worker))), ); i.register_op( "host_post_message", @@ -34,7 +42,159 @@ pub fn init(i: &mut Isolate, s: &State) { "host_get_message", s.core_op(json_op(s.stateful_op(op_host_get_message))), ); - i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics)))); +} + +fn create_web_worker( + name: String, + global_state: GlobalState, + permissions: DenoPermissions, + specifier: ModuleSpecifier, +) -> Result { + let state = + State::new_for_worker(global_state, Some(permissions), specifier)?; + + let mut worker = + WebWorker::new(name.to_string(), startup_data::deno_isolate_init(), state); + let script = format!("bootstrapWorkerRuntime(\"{}\")", name); + worker.execute(&script)?; + + Ok(worker) +} + +// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs` +pub fn run_worker_loop( + rt: &mut tokio::runtime::Runtime, + worker: &mut Worker, +) -> Result<(), ErrBox> { + let mut worker_is_ready = false; + + let fut = poll_fn(|cx| -> Poll> { + if !worker_is_ready { + match worker.poll_unpin(cx) { + Poll::Ready(r) => { + if let Err(e) = r { + let mut sender = worker.internal_channels.sender.clone(); + futures::executor::block_on(sender.send(WorkerEvent::Error(e))) + .expect("Failed to post message to host"); + } + worker_is_ready = true; + } + Poll::Pending => {} + } + } + + let maybe_msg = { + match worker.internal_channels.receiver.poll_next_unpin(cx) { + Poll::Ready(r) => match r { + Some(msg) => { + let msg_str = String::from_utf8(msg.to_vec()).unwrap(); + debug!("received message from host: {}", msg_str); + Some(msg_str) + } + None => { + debug!("channel closed by host, worker event loop shuts down"); + return Poll::Ready(Ok(())); + } + }, + Poll::Pending => None, + } + }; + + if let Some(msg) = maybe_msg { + // TODO: just add second value and then bind using rusty_v8 + // to get structured clone/transfer working + let script = format!("workerMessageRecvCallback({})", msg); + worker + .execute(&script) + .expect("Failed to execute message cb"); + // Let worker be polled again + worker_is_ready = false; + worker.waker.wake(); + } + + Poll::Pending + }); + + rt.block_on(fut) +} + +// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs` +// TODO(bartlomieju): check if order of actions is aligned to Worker spec +fn run_worker_thread( + name: String, + global_state: GlobalState, + permissions: DenoPermissions, + specifier: ModuleSpecifier, + has_source_code: bool, + source_code: String, +) -> Result { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::>(1); + + let builder = + std::thread::Builder::new().name(format!("deno-worker-{}", name)); + // TODO(bartlomieju): store JoinHandle as well + builder.spawn(move || { + // Any error inside this block is terminal: + // - JS worker is useless - meaning it throws an exception and can't do anything else, + // all action done upon it should be noops + // - newly spawned thread exits + let result = + create_web_worker(name, global_state, permissions, specifier.clone()); + + if let Err(err) = result { + handle_sender.send(Err(err)).unwrap(); + return; + } + + let mut worker = result.unwrap(); + // Send thread safe handle to newly created worker to host thread + handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); + drop(handle_sender); + + // At this point the only method of communication with host + // is using `worker.internal_channels`. + // + // Host can already push messages and interact with worker. + // + // Next steps: + // - create tokio runtime + // - load provided module or code + // - start driving worker's event loop + + let mut rt = create_basic_runtime(); + + // TODO: run with using select with terminate + + // Execute provided source code immediately + let result = if has_source_code { + worker.execute(&source_code) + } else { + // TODO(bartlomieju): add "type": "classic", ie. ability to load + // script instead of module + let load_future = worker + .execute_mod_async(&specifier, None, false) + .boxed_local(); + + rt.block_on(load_future) + }; + + if let Err(e) = result { + let mut sender = worker.internal_channels.sender.clone(); + futures::executor::block_on(sender.send(WorkerEvent::Error(e))) + .expect("Failed to post message to host"); + + // Failure to execute script is a terminal error, bye, bye. + return; + } + + // TODO(bartlomieju): this thread should return result of event loop + // that means that we should store JoinHandle to thread to ensure + // that it actually terminates. + run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop"); + })?; + + handle_receiver.recv().unwrap() } #[derive(Deserialize)] @@ -61,72 +221,28 @@ fn op_create_worker( let parent_state = state.clone(); let state = state.borrow(); let global_state = state.global_state.clone(); - let child_permissions = state.permissions.clone(); + let permissions = state.permissions.clone(); let referrer = state.main_module.to_string(); drop(state); - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::>(1); - - // TODO(bartlomieju): Isn't this wrong? - let result = ModuleSpecifier::resolve_url_or_path(&specifier)?; - let module_specifier = if !has_source_code { - ModuleSpecifier::resolve_import(&specifier, &referrer)? - } else { - result - }; - - std::thread::spawn(move || { - let result = State::new_for_worker( - global_state, - Some(child_permissions), // by default share with parent - module_specifier.clone(), - ); - if let Err(err) = result { - handle_sender.send(Err(err)).unwrap(); - return; - } - let child_state = result.unwrap(); - let worker_name = args_name.unwrap_or_else(|| { - // TODO(bartlomieju): change it to something more descriptive - format!("USER-WORKER-{}", specifier) - }); - - // TODO: add a new option to make child worker not sharing permissions - // with parent (aka .clone(), requests from child won't reflect in parent) - let mut worker = WebWorker::new( - worker_name.to_string(), - startup_data::deno_isolate_init(), - child_state, - ); - let script = format!("bootstrapWorkerRuntime(\"{}\")", worker_name); - js_check(worker.execute(&script)); - js_check(worker.execute("runWorkerMessageLoop()")); - - handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); - - // Has provided source code, execute immediately. - if has_source_code { - js_check(worker.execute(&source_code)); - // FIXME(bartlomieju): runtime is not run in this case - return; - } - - let fut = async move { - let r = worker - .execute_mod_async(&module_specifier, None, false) - .await; - if r.is_ok() { - let _ = (&mut *worker).await; - } - } - .boxed_local(); - - crate::tokio_util::run_basic(fut); + let module_specifier = + ModuleSpecifier::resolve_import(&specifier, &referrer)?; + let worker_name = args_name.unwrap_or_else(|| { + // TODO(bartlomieju): change it to something more descriptive + format!("USER-WORKER-{}", specifier) }); - let handle = handle_receiver.recv().unwrap()?; - let worker_id = parent_state.add_child_worker(handle); + let worker_handle = run_worker_thread( + worker_name, + global_state, + permissions, + module_specifier, + has_source_code, + source_code, + )?; + // At this point all interactions with worker happen using thread + // safe handler returned from previous function call + let worker_id = parent_state.add_child_worker(worker_handle); Ok(JsonOp::Sync(json!({ "id": worker_id }))) } @@ -136,7 +252,7 @@ struct WorkerArgs { id: i32, } -fn op_host_close_worker( +fn op_host_terminate_worker( state: &State, args: Value, _data: Option, @@ -144,23 +260,37 @@ fn op_host_close_worker( let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; let mut state = state.borrow_mut(); - - let maybe_worker_handle = state.workers.remove(&id); - if let Some(worker_handle) = maybe_worker_handle { - let mut sender = worker_handle.sender.clone(); - sender.close_channel(); - - let mut receiver = - futures::executor::block_on(worker_handle.receiver.lock()); - receiver.close(); - }; - + let worker_handle = + state.workers.remove(&id).expect("No worker handle found"); + worker_handle.terminate(); Ok(JsonOp::Sync(json!({}))) } -#[derive(Deserialize)] -struct HostGetMessageArgs { - id: i32, +fn serialize_worker_event(event: WorkerEvent) -> Value { + match event { + WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }), + WorkerEvent::Error(error) => match error.kind() { + ErrorKind::JSError => { + let error = error.downcast::().unwrap(); + let exception: V8Exception = error.into(); + json!({ + "type": "error", + "error": { + "message": exception.message, + "fileName": exception.script_resource_name, + "lineNumber": exception.line_number, + "columnNumber": exception.start_column, + } + }) + } + _ => json!({ + "type": "error", + "error": { + "message": error.to_string(), + } + }), + }, + } } /// Get message from guest worker as host @@ -169,59 +299,48 @@ fn op_host_get_message( args: Value, _data: Option, ) -> Result { - let args: HostGetMessageArgs = serde_json::from_value(args)?; + let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; - - let state = state.borrow(); - // TODO: don't return bad resource anymore - let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?; - let fut = worker_handle.get_message(); + let state_ = state.borrow(); + let worker_handle = state_ + .workers + .get(&id) + .expect("No worker handle found") + .clone(); + let state_ = state.clone(); let op = async move { - let maybe_buf = fut.await; - Ok(json!({ "data": maybe_buf })) + let response = match worker_handle.get_event().await { + Some(event) => serialize_worker_event(event), + None => { + let mut state_ = state_.borrow_mut(); + let mut handle = + state_.workers.remove(&id).expect("No worker handle found"); + handle.sender.close_channel(); + // TODO(bartlomieju): join thread handle here + json!({ "type": "close" }) + } + }; + Ok(response) }; Ok(JsonOp::Async(op.boxed_local())) } -#[derive(Deserialize)] -struct HostPostMessageArgs { - id: i32, -} - /// Post message to guest worker as host fn op_host_post_message( state: &State, args: Value, data: Option, ) -> Result { - let args: HostPostMessageArgs = serde_json::from_value(args)?; + let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); debug!("post message to worker {}", id); let state = state.borrow(); - // TODO: don't return bad resource anymore - let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?; + let worker_handle = state.workers.get(&id).expect("No worker handle found"); let fut = worker_handle .post_message(msg) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); futures::executor::block_on(fut)?; Ok(JsonOp::Sync(json!({}))) } - -fn op_metrics( - state: &State, - _args: Value, - _zero_copy: Option, -) -> Result { - let state = state.borrow(); - let m = &state.metrics; - - Ok(JsonOp::Sync(json!({ - "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64, - "opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64, - "bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64, - "bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64, - "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64 - }))) -} diff --git a/cli/state.rs b/cli/state.rs index 4e2f47e62a..b9ef62053b 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -8,8 +8,7 @@ use crate::metrics::Metrics; use crate::ops::JsonOp; use crate::ops::MinimalOp; use crate::permissions::DenoPermissions; -use crate::worker::WorkerChannelsExternal; -use crate::worker::WorkerChannelsInternal; +use crate::worker::WorkerHandle; use deno_core::Buf; use deno_core::CoreOp; use deno_core::ErrBox; @@ -55,8 +54,7 @@ pub struct StateInner { pub import_map: Option, pub metrics: Metrics, pub global_timer: GlobalTimer, - pub workers: HashMap, - pub worker_channels_internal: Option, + pub workers: HashMap, pub next_worker_id: AtomicUsize, pub start_time: Instant, pub seeded_rng: Option, @@ -232,7 +230,6 @@ impl State { import_map, metrics: Metrics::default(), global_timer: GlobalTimer::new(), - worker_channels_internal: None, workers: HashMap::new(), next_worker_id: AtomicUsize::new(0), start_time: Instant::now(), @@ -269,7 +266,6 @@ impl State { import_map: None, metrics: Metrics::default(), global_timer: GlobalTimer::new(), - worker_channels_internal: None, workers: HashMap::new(), next_worker_id: AtomicUsize::new(0), start_time: Instant::now(), @@ -282,7 +278,7 @@ impl State { Ok(Self(state)) } - pub fn add_child_worker(&self, handle: WorkerChannelsExternal) -> u32 { + pub fn add_child_worker(&self, handle: WorkerHandle) -> u32 { let mut inner_state = self.borrow_mut(); let worker_id = inner_state.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32; diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index dd242a32a8..a42dd439e3 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -397,12 +397,10 @@ itest!(_026_redirect_javascript { http_server: true, }); -/* TODO(ry) Disabled to get #3844 landed faster. Re-enable. itest!(_026_workers { args: "run --reload 026_workers.ts", output: "026_workers.ts.out", }); -*/ itest!(workers_basic { args: "run --reload workers_basic.ts", diff --git a/cli/tests/subdir/nested_worker.js b/cli/tests/subdir/nested_worker.js new file mode 100644 index 0000000000..b0acd70d75 --- /dev/null +++ b/cli/tests/subdir/nested_worker.js @@ -0,0 +1,19 @@ +// Specifier should be resolved relative to current file +const jsWorker = new Worker("./sibling_worker.js", { + type: "module", + name: "sibling" +}); + +jsWorker.onerror = _e => { + postMessage({ type: "error" }); +}; + +jsWorker.onmessage = e => { + console.log("js worker on message"); + postMessage({ type: "msg", text: e }); + close(); +}; + +onmessage = function(e) { + jsWorker.postMessage(e.data); +}; diff --git a/cli/tests/subdir/sibling_worker.js b/cli/tests/subdir/sibling_worker.js new file mode 100644 index 0000000000..0e91141ce4 --- /dev/null +++ b/cli/tests/subdir/sibling_worker.js @@ -0,0 +1,4 @@ +onmessage = e => { + postMessage(e.data); + close(); +}; diff --git a/cli/tests/subdir/test_worker.js b/cli/tests/subdir/test_worker.js index f0d9fbed63..70e1d8b731 100644 --- a/cli/tests/subdir/test_worker.js +++ b/cli/tests/subdir/test_worker.js @@ -1,6 +1,5 @@ let thrown = false; -// TODO(bartlomieju): add test for throwing in web worker if (self.name !== "jsWorker") { throw Error(`Bad worker name: ${self.name}, expected jsWorker`); } @@ -14,7 +13,6 @@ onmessage = function(e) { } postMessage(e.data); - close(); }; diff --git a/cli/tests/subdir/test_worker.ts b/cli/tests/subdir/test_worker.ts index bc3f358f85..2ea8f9214c 100644 --- a/cli/tests/subdir/test_worker.ts +++ b/cli/tests/subdir/test_worker.ts @@ -4,8 +4,6 @@ if (self.name !== "tsWorker") { onmessage = function(e): void { console.log(e.data); - postMessage(e.data); - close(); }; diff --git a/cli/tests/subdir/throwing_worker.js b/cli/tests/subdir/throwing_worker.js new file mode 100644 index 0000000000..56ee4ff887 --- /dev/null +++ b/cli/tests/subdir/throwing_worker.js @@ -0,0 +1,2 @@ +// This worker just throws error when it's being executed +throw Error("Thrown error"); diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs index e5878cdf7b..0e1257da7b 100644 --- a/cli/tokio_util.rs +++ b/cli/tokio_util.rs @@ -1,30 +1,19 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +pub fn create_basic_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new() + .basic_scheduler() + .enable_io() + .enable_time() + .build() + .unwrap() +} + // TODO(ry) rename to run_local ? pub fn run_basic(future: F) -> R where F: std::future::Future + 'static, { - let mut rt = tokio::runtime::Builder::new() - .basic_scheduler() - .enable_io() - .enable_time() - .build() - .unwrap(); + let mut rt = create_basic_runtime(); rt.block_on(future) } - -// TODO(ry) maybe replace with tokio::task::spawn_blocking -#[cfg(test)] -pub fn spawn_thread(f: F) -> impl std::future::Future -where - F: 'static + Send + FnOnce() -> R, - R: 'static + Send, -{ - let (sender, receiver) = tokio::sync::oneshot::channel::(); - std::thread::spawn(move || { - let result = f(); - sender.send(result) - }); - async { receiver.await.unwrap() } -} diff --git a/cli/web_worker.rs b/cli/web_worker.rs index 05e3184d91..c0a712aed6 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -29,7 +29,7 @@ impl WebWorker { { let isolate = &mut worker.isolate; ops::runtime::init(isolate, &state); - ops::web_worker::init(isolate, &state); + ops::web_worker::init(isolate, &state, &worker.internal_channels.sender); ops::worker_host::init(isolate, &state); ops::errors::init(isolate, &state); ops::timers::init(isolate, &state); @@ -65,9 +65,12 @@ impl Future for WebWorker { #[cfg(test)] mod tests { use super::*; + use crate::ops::worker_host::run_worker_loop; use crate::startup_data; use crate::state::State; use crate::tokio_util; + use crate::worker::WorkerEvent; + use crate::worker::WorkerHandle; fn create_test_worker() -> WebWorker { let state = State::mock("./hello.js"); @@ -77,77 +80,95 @@ mod tests { state, ); worker.execute("bootstrapWorkerRuntime(\"TEST\")").unwrap(); - worker.execute("runWorkerMessageLoop()").unwrap(); worker } - #[test] fn test_worker_messages() { - let mut worker = create_test_worker(); - let source = r#" - onmessage = function(e) { - console.log("msg from main script", e.data); - if (e.data == "exit") { - delete self.onmessage; - return; - } else { - console.assert(e.data === "hi"); + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::(1); + + let join_handle = std::thread::spawn(move || { + let mut worker = create_test_worker(); + let source = r#" + onmessage = function(e) { + console.log("msg from main script", e.data); + if (e.data == "exit") { + return close(); + } else { + console.assert(e.data === "hi"); + } + postMessage([1, 2, 3]); + console.log("after postMessage"); } - postMessage([1, 2, 3]); - console.log("after postMessage"); - } - "#; - worker.execute(source).unwrap(); - - let handle = worker.thread_safe_handle(); - let _ = tokio_util::spawn_thread(move || { - tokio_util::run_basic(async move { - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()).await; - assert!(r.is_ok()); - - let maybe_msg = handle.get_message().await; - assert!(maybe_msg.is_some()); - - let r = handle.post_message(msg.clone()).await; - assert!(r.is_ok()); - - let maybe_msg = handle.get_message().await; - assert!(maybe_msg.is_some()); - assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); - - let msg = json!("exit") - .to_string() - .into_boxed_str() - .into_boxed_bytes(); - let r = handle.post_message(msg).await; - assert!(r.is_ok()); - }) + "#; + worker.execute(source).unwrap(); + let handle = worker.thread_safe_handle(); + handle_sender.send(handle).unwrap(); + let mut rt = tokio_util::create_basic_runtime(); + let r = run_worker_loop(&mut rt, &mut worker); + assert!(r.is_ok()) }); - let r = tokio_util::run_basic(worker); - assert!(r.is_ok()) + let mut handle = handle_receiver.recv().unwrap(); + + tokio_util::run_basic(async move { + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = handle.post_message(msg.clone()).await; + assert!(r.is_ok()); + + let maybe_msg = handle.get_event().await; + assert!(maybe_msg.is_some()); + + let r = handle.post_message(msg.clone()).await; + assert!(r.is_ok()); + + let maybe_msg = handle.get_event().await; + assert!(maybe_msg.is_some()); + match maybe_msg { + Some(WorkerEvent::Message(buf)) => { + assert_eq!(*buf, *b"[1,2,3]"); + } + _ => unreachable!(), + } + + let msg = json!("exit") + .to_string() + .into_boxed_str() + .into_boxed_bytes(); + let r = handle.post_message(msg).await; + assert!(r.is_ok()); + let event = handle.get_event().await; + assert!(event.is_none()); + handle.sender.close_channel(); + }); + join_handle.join().expect("Failed to join worker thread"); } #[test] fn removed_from_resource_table_on_close() { - let mut worker = create_test_worker(); - let handle = worker.thread_safe_handle(); + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::(1); - worker - .execute("onmessage = () => { delete self.onmessage; }") - .unwrap(); - - let worker_post_message_fut = tokio_util::spawn_thread(move || { - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = futures::executor::block_on(handle.post_message(msg)); - assert!(r.is_ok()); + let join_handle = std::thread::spawn(move || { + let mut worker = create_test_worker(); + worker.execute("onmessage = () => { close(); }").unwrap(); + let handle = worker.thread_safe_handle(); + handle_sender.send(handle).unwrap(); + let mut rt = tokio_util::create_basic_runtime(); + let r = run_worker_loop(&mut rt, &mut worker); + assert!(r.is_ok()) }); + let mut handle = handle_receiver.recv().unwrap(); + tokio_util::run_basic(async move { - worker_post_message_fut.await; - let r = worker.await; + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = handle.post_message(msg.clone()).await; assert!(r.is_ok()); + let event = handle.get_event().await; + assert!(event.is_none()); + handle.sender.close_channel(); }); + join_handle.join().expect("Failed to join worker thread"); } } diff --git a/cli/worker.rs b/cli/worker.rs index 20b8b80210..b804ff449a 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -24,76 +24,55 @@ use std::task::Poll; use tokio::sync::Mutex as AsyncMutex; use url::Url; -/// Wraps mpsc channels so they can be referenced -/// from ops and used to facilitate parent-child communication -/// for workers. -#[derive(Clone)] -pub struct WorkerChannels { - pub sender: mpsc::Sender, - pub receiver: Arc>>, +/// Events that are sent to host from child +/// worker. +pub enum WorkerEvent { + Message(Buf), + Error(ErrBox), } -impl WorkerChannels { +pub struct WorkerChannelsInternal { + pub sender: mpsc::Sender, + pub receiver: mpsc::Receiver, +} + +#[derive(Clone)] +pub struct WorkerHandle { + pub sender: mpsc::Sender, + pub receiver: Arc>>, + // terminate_channel +} + +impl WorkerHandle { + pub fn terminate(&self) { + todo!() + } + /// Post message to worker as a host. pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> { let mut sender = self.sender.clone(); sender.send(buf).map_err(ErrBox::from).await } - /// Get message from worker as a host. - pub fn get_message(&self) -> Pin>>> { - let receiver_mutex = self.receiver.clone(); - - async move { - let mut receiver = receiver_mutex.lock().await; - receiver.next().await - } - .boxed_local() + // TODO: should use `try_lock` and return error if + // more than one listener tries to get event + pub async fn get_event(&self) -> Option { + let mut receiver = self.receiver.lock().await; + receiver.next().await } } -pub struct WorkerChannelsInternal(WorkerChannels); - -impl Deref for WorkerChannelsInternal { - type Target = WorkerChannels; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for WorkerChannelsInternal { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -#[derive(Clone)] -pub struct WorkerChannelsExternal(WorkerChannels); - -impl Deref for WorkerChannelsExternal { - type Target = WorkerChannels; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for WorkerChannelsExternal { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -fn create_channels() -> (WorkerChannelsInternal, WorkerChannelsExternal) { +fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) { let (in_tx, in_rx) = mpsc::channel::(1); - let (out_tx, out_rx) = mpsc::channel::(1); - let internal_channels = WorkerChannelsInternal(WorkerChannels { + let (out_tx, out_rx) = mpsc::channel::(1); + let internal_channels = WorkerChannelsInternal { sender: out_tx, - receiver: Arc::new(AsyncMutex::new(in_rx)), - }); - let external_channels = WorkerChannelsExternal(WorkerChannels { + receiver: in_rx, + }; + let external_channels = WorkerHandle { sender: in_tx, receiver: Arc::new(AsyncMutex::new(out_rx)), - }); + }; (internal_channels, external_channels) } @@ -113,7 +92,9 @@ pub struct Worker { pub name: String, pub isolate: Box, pub state: State, - external_channels: WorkerChannelsExternal, + pub waker: AtomicWaker, + pub(crate) internal_channels: WorkerChannelsInternal, + external_channels: WorkerHandle, } impl Worker { @@ -127,15 +108,13 @@ impl Worker { }); let (internal_channels, external_channels) = create_channels(); - { - let mut state = state.borrow_mut(); - state.worker_channels_internal = Some(internal_channels); - } Self { name, isolate, state, + waker: AtomicWaker::new(), + internal_channels, external_channels, } } @@ -174,7 +153,7 @@ impl Worker { } /// Returns a way to communicate with the Worker from other threads. - pub fn thread_safe_handle(&self) -> WorkerChannelsExternal { + pub fn thread_safe_handle(&self) -> WorkerHandle { self.external_channels.clone() } } @@ -184,8 +163,7 @@ impl Future for Worker { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let inner = self.get_mut(); - let waker = AtomicWaker::new(); - waker.register(cx.waker()); + inner.waker.register(cx.waker()); inner.isolate.poll_unpin(cx) } } @@ -224,7 +202,7 @@ impl MainWorker { ops::signal::init(isolate, &state); ops::timers::init(isolate, &state); ops::worker_host::init(isolate, &state); - ops::web_worker::init(isolate, &state); + ops::web_worker::init(isolate, &state, &worker.internal_channels.sender); } Self(worker) } diff --git a/core/es_isolate.rs b/core/es_isolate.rs index 295fe00ed9..aeb6e318a0 100644 --- a/core/es_isolate.rs +++ b/core/es_isolate.rs @@ -235,6 +235,51 @@ impl EsIsolate { } } + /// TODO(bartlomieju): copy-pasta to avoid problem with global handle attached + /// to ErrBox + pub fn mod_evaluate_dyn_import( + &mut self, + id: ModuleId, + ) -> Result<(), ErrBox> { + let isolate = self.core_isolate.v8_isolate.as_ref().unwrap(); + let mut locker = v8::Locker::new(isolate); + let mut hs = v8::HandleScope::new(locker.enter()); + let scope = hs.enter(); + assert!(!self.core_isolate.global_context.is_empty()); + let context = self.core_isolate.global_context.get(scope).unwrap(); + let mut cs = v8::ContextScope::new(scope, context); + let scope = cs.enter(); + + let info = self.modules.get_info(id).expect("ModuleInfo not found"); + let mut module = info.handle.get(scope).expect("Empty module handle"); + let mut status = module.get_status(); + + if status == v8::ModuleStatus::Instantiated { + let ok = module.evaluate(scope, context).is_some(); + // Update status after evaluating. + status = module.get_status(); + if ok { + assert!( + status == v8::ModuleStatus::Evaluated + || status == v8::ModuleStatus::Errored + ); + } else { + assert!(status == v8::ModuleStatus::Errored); + } + } + + match status { + v8::ModuleStatus::Evaluated => Ok(()), + v8::ModuleStatus::Errored => { + let i = &mut self.core_isolate; + let exception = module.get_exception(); + i.exception_to_err_result(scope, exception) + .map_err(|err| i.attach_handle_to_error(scope, err, exception)) + } + other => panic!("Unexpected module status {:?}", other), + } + } + /// Evaluates an already instantiated ES module. /// /// ErrBox can be downcast to a type that exposes additional information about @@ -274,7 +319,6 @@ impl EsIsolate { let i = &mut self.core_isolate; let exception = module.get_exception(); i.exception_to_err_result(scope, exception) - .map_err(|err| i.attach_handle_to_error(scope, err, exception)) } other => panic!("Unexpected module status {:?}", other), } @@ -425,7 +469,7 @@ impl EsIsolate { // Load is done. let module_id = load.root_module_id.unwrap(); self.mod_instantiate(module_id)?; - match self.mod_evaluate(module_id) { + match self.mod_evaluate_dyn_import(module_id) { Ok(()) => self.dyn_import_done(dyn_import_id, module_id)?, Err(err) => self.dyn_import_error(dyn_import_id, err)?, };