1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

workers: basic event loop (#3828)

* establish basic event loop for workers
* make "self.close()" inside worker
* remove "runWorkerMessageLoop() - instead manually call global function 
  in Rust when message arrives. This is done in preparation for structured clone
* refactor "WorkerChannel" and use distinct structs for internal 
  and external channels;  "WorkerChannelsInternal" and "WorkerHandle"
* move "State.worker_channels_internal" to "Worker.internal_channels"
* add "WorkerEvent" enum for child->host communication; 
  currently "Message(Buf)" and  "Error(ErrBox)" variants are supported
* add tests for nested workers
* add tests for worker throwing error on startup
This commit is contained in:
Bartek Iwańczuk 2020-02-11 10:04:59 +01:00 committed by GitHub
parent 81905a867e
commit 79b3bc05d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 779 additions and 517 deletions

View file

@ -30,7 +30,7 @@ impl CompilerWorker {
let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state);
ops::compiler::init(isolate, &state);
ops::web_worker::init(isolate, &state);
ops::web_worker::init(isolate, &state, &worker.internal_channels.sender);
ops::errors::init(isolate, &state);
// for compatibility with Worker scope, though unused at
// the moment

View file

@ -8,11 +8,15 @@ use crate::file_fetcher::SourceFile;
use crate::file_fetcher::SourceFileFetcher;
use crate::global_state::GlobalState;
use crate::msg;
use crate::ops::worker_host::run_worker_loop;
use crate::ops::JsonResult;
use crate::source_maps::SourceMapGetter;
use crate::startup_data;
use crate::state::*;
use crate::tokio_util::create_basic_runtime;
use crate::version;
use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle;
use deno_core::Buf;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
@ -288,13 +292,11 @@ impl TsCompiler {
true,
);
let maybe_msg = execute_in_thread(global_state.clone(), req_msg).await?;
if let Some(ref msg) = maybe_msg {
let json_str = std::str::from_utf8(msg).unwrap();
debug!("Message: {}", json_str);
if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
return Err(ErrBox::from(diagnostics));
}
let msg = execute_in_thread(global_state.clone(), req_msg).await?;
let json_str = std::str::from_utf8(&msg).unwrap();
debug!("Message: {}", json_str);
if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
return Err(ErrBox::from(diagnostics));
}
Ok(())
}
@ -376,13 +378,11 @@ impl TsCompiler {
let compiling_job = global_state
.progress
.add("Compile", &module_url.to_string());
let maybe_msg = execute_in_thread(global_state.clone(), req_msg).await?;
let msg = execute_in_thread(global_state.clone(), req_msg).await?;
if let Some(ref msg) = maybe_msg {
let json_str = std::str::from_utf8(msg).unwrap();
if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
return Err(ErrBox::from(diagnostics));
}
let json_str = std::str::from_utf8(&msg).unwrap();
if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
return Err(ErrBox::from(diagnostics));
}
let compiled_module = ts_compiler.get_compiled_module(&source_file_.url)?;
drop(compiling_job);
@ -602,45 +602,45 @@ impl TsCompiler {
}
}
// TODO(bartlomieju): exactly same function is in `wasm.rs` - only difference
// it created WasmCompiler instead of TsCompiler - deduplicate
async fn execute_in_thread(
global_state: GlobalState,
req: Buf,
) -> Result<Option<Buf>, ErrBox> {
let (load_sender, load_receiver) =
tokio::sync::oneshot::channel::<Result<Option<Buf>, ErrBox>>();
std::thread::spawn(move || {
debug!(">>>>> compile_async START");
) -> Result<Buf, ErrBox> {
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
let builder =
std::thread::Builder::new().name("deno-ts-compiler".to_string());
let join_handle = builder.spawn(move || {
let mut worker = TsCompiler::setup_worker(global_state.clone());
let handle = worker.thread_safe_handle();
crate::tokio_util::run_basic(
async move {
if let Err(err) = handle.post_message(req).await {
load_sender.send(Err(err)).unwrap();
return;
}
if let Err(err) = (&mut *worker).await {
load_sender.send(Err(err)).unwrap();
return;
}
let maybe_msg = handle.get_message().await;
load_sender.send(Ok(maybe_msg)).unwrap();
debug!(">>>>> compile_sync END");
}
.boxed_local(),
);
});
load_receiver.await.unwrap()
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
drop(handle_sender);
let mut rt = create_basic_runtime();
run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
})?;
let mut handle = handle_receiver.recv().unwrap()?;
handle.post_message(req).await?;
let event = handle.get_event().await.expect("Compiler didn't respond");
let buf = match event {
WorkerEvent::Message(buf) => Ok(buf),
WorkerEvent::Error(error) => Err(error),
}?;
// Compiler worker finishes after one request
// so we should receive signal that channel was closed.
// Then close worker's channel and join the thread.
let event = handle.get_event().await;
assert!(event.is_none());
handle.sender.close_channel();
join_handle.join().unwrap();
Ok(buf)
}
async fn execute_in_thread_json(
req_msg: Buf,
global_state: GlobalState,
) -> JsonResult {
let maybe_msg = execute_in_thread(global_state, req_msg).await?;
let msg = maybe_msg.unwrap();
let msg = execute_in_thread(global_state, req_msg).await?;
let json_str = std::str::from_utf8(&msg).unwrap();
Ok(json!(json_str))
}

View file

@ -3,8 +3,13 @@ use super::compiler_worker::CompilerWorker;
use crate::compilers::CompiledModule;
use crate::file_fetcher::SourceFile;
use crate::global_state::GlobalState;
use crate::ops::worker_host::run_worker_loop;
use crate::startup_data;
use crate::state::*;
use crate::tokio_util::create_basic_runtime;
use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle;
use deno_core::Buf;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
use serde_derive::Deserialize;
@ -83,64 +88,67 @@ impl WasmCompiler {
if let Some(m) = maybe_cached {
return Ok(m);
}
let (load_sender, load_receiver) =
tokio::sync::oneshot::channel::<Result<CompiledModule, ErrBox>>();
std::thread::spawn(move || {
debug!(">>>>> wasm_compile_async START");
let base64_data = base64::encode(&source_file.source_code);
let mut worker = WasmCompiler::setup_worker(global_state);
let handle = worker.thread_safe_handle();
let url = source_file.url.clone();
let fut = async move {
let _ = handle
.post_message(
serde_json::to_string(&base64_data)
.unwrap()
.into_boxed_str()
.into_boxed_bytes(),
)
.await;
if let Err(err) = (&mut *worker).await {
load_sender.send(Err(err)).unwrap();
return;
}
debug!("Sent message to worker");
let json_msg = handle.get_message().await.expect("not handled");
debug!("Received message from worker");
let module_info: WasmModuleInfo =
serde_json::from_slice(&json_msg).unwrap();
debug!("WASM module info: {:#?}", &module_info);
let code = wrap_wasm_code(
&base64_data,
&module_info.import_list,
&module_info.export_list,
);
debug!("Generated code: {}", &code);
let module = CompiledModule {
code,
name: url.to_string(),
};
{
cache_.lock().unwrap().insert(url.clone(), module.clone());
}
debug!("<<<<< wasm_compile_async END");
load_sender.send(Ok(module)).unwrap();
};
crate::tokio_util::run_basic(fut);
});
load_receiver.await.unwrap()
debug!(">>>>> wasm_compile_async START");
let base64_data = base64::encode(&source_file.source_code);
let url = source_file.url.clone();
let req_msg = serde_json::to_string(&base64_data)
.unwrap()
.into_boxed_str()
.into_boxed_bytes();
let msg = execute_in_thread(global_state.clone(), req_msg).await?;
debug!("Received message from worker");
let module_info: WasmModuleInfo = serde_json::from_slice(&msg).unwrap();
debug!("WASM module info: {:#?}", &module_info);
let code = wrap_wasm_code(
&base64_data,
&module_info.import_list,
&module_info.export_list,
);
debug!("Generated code: {}", &code);
let module = CompiledModule {
code,
name: url.to_string(),
};
{
cache_.lock().unwrap().insert(url.clone(), module.clone());
}
debug!("<<<<< wasm_compile_async END");
Ok(module)
}
}
async fn execute_in_thread(
global_state: GlobalState,
req: Buf,
) -> Result<Buf, ErrBox> {
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
let builder =
std::thread::Builder::new().name("deno-wasm-compiler".to_string());
let join_handle = builder.spawn(move || {
let mut worker = WasmCompiler::setup_worker(global_state);
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
drop(handle_sender);
let mut rt = create_basic_runtime();
run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
})?;
let mut handle = handle_receiver.recv().unwrap()?;
handle.post_message(req).await?;
let event = handle.get_event().await.expect("Compiler didn't respond");
let buf = match event {
WorkerEvent::Message(buf) => Ok(buf),
WorkerEvent::Error(error) => Err(error),
}?;
// Compiler worker finishes after one request
// so we should receive signal that channel was closed.
// Then close worker's channel and join the thread.
let event = handle.get_event().await;
assert!(event.is_none());
handle.sender.close_channel();
join_handle.join().unwrap();
Ok(buf)
}
fn build_single_import(index: usize, origin: &str) -> String {
let origin_json = serde_json::to_string(origin).unwrap();
format!(

View file

@ -40,10 +40,7 @@ import { Diagnostic } from "./diagnostics.ts";
import { fromTypeScriptDiagnostic } from "./diagnostics_util.ts";
import { assert } from "./util.ts";
import * as util from "./util.ts";
import {
bootstrapWorkerRuntime,
runWorkerMessageLoop
} from "./runtime_worker.ts";
import { bootstrapWorkerRuntime } from "./runtime_worker.ts";
interface CompilerRequestCompile {
type: CompilerRequestType.Compile;
@ -340,13 +337,11 @@ async function wasmCompilerOnMessage({
function bootstrapTsCompilerRuntime(): void {
bootstrapWorkerRuntime("TS");
globalThis.onmessage = tsCompilerOnMessage;
runWorkerMessageLoop();
}
function bootstrapWasmCompilerRuntime(): void {
bootstrapWorkerRuntime("WASM");
globalThis.onmessage = wasmCompilerOnMessage;
runWorkerMessageLoop();
}
Object.defineProperties(globalThis, {

View file

@ -43,10 +43,10 @@ export let OP_REVOKE_PERMISSION: number;
export let OP_REQUEST_PERMISSION: number;
export let OP_CREATE_WORKER: number;
export let OP_HOST_POST_MESSAGE: number;
export let OP_HOST_CLOSE_WORKER: number;
export let OP_HOST_TERMINATE_WORKER: number;
export let OP_HOST_GET_MESSAGE: number;
export let OP_WORKER_POST_MESSAGE: number;
export let OP_WORKER_GET_MESSAGE: number;
export let OP_WORKER_CLOSE: number;
export let OP_RUN: number;
export let OP_RUN_STATUS: number;
export let OP_KILL: number;

View file

@ -118,7 +118,6 @@ declare global {
var bootstrapWorkerRuntime:
| ((name: string) => Promise<void> | void)
| undefined;
var runWorkerMessageLoop: (() => Promise<void> | void) | undefined;
var onerror:
| ((
msg: string,

View file

@ -37,7 +37,6 @@ declare const postMessage: typeof __workerMain.postMessage;
declare namespace __workerMain {
export let onmessage: (e: { data: any }) => void;
export function postMessage(data: any): void;
export function getMessage(): Promise<any>;
export function close(): void;
export const name: string;
}

View file

@ -1,9 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { bootstrapMainRuntime } from "./runtime_main.ts";
import {
bootstrapWorkerRuntime,
runWorkerMessageLoop
} from "./runtime_worker.ts";
import { bootstrapWorkerRuntime } from "./runtime_worker.ts";
Object.defineProperties(globalThis, {
bootstrapMainRuntime: {
@ -17,11 +14,5 @@ Object.defineProperties(globalThis, {
enumerable: false,
writable: false,
configurable: false
},
runWorkerMessageLoop: {
value: runWorkerMessageLoop,
enumerable: false,
writable: false,
configurable: false
}
});

View file

@ -3,12 +3,9 @@
// This module is the entry point for "worker" isolate, ie. the one
// that is created using `new Worker()` JS API.
//
// It provides two functions that should be called by Rust:
// It provides a single function that should be called by Rust:
// - `bootstrapWorkerRuntime` - must be called once, when Isolate is created.
// It sets up runtime by providing globals for `DedicatedWorkerScope`.
// - `runWorkerMessageLoop` - starts receiving messages from parent worker,
// can be called multiple times - eg. to restart worker execution after
// exception occurred and was handled by parent worker
/* eslint-disable @typescript-eslint/no-explicit-any */
import {
@ -20,13 +17,12 @@ import {
eventTargetProperties
} from "./globals.ts";
import * as dispatch from "./dispatch.ts";
import { sendAsync, sendSync } from "./dispatch_json.ts";
import { sendSync } from "./dispatch_json.ts";
import { log } from "./util.ts";
import { TextDecoder, TextEncoder } from "./text_encoding.ts";
import { TextEncoder } from "./text_encoding.ts";
import * as runtime from "./runtime.ts";
const encoder = new TextEncoder();
const decoder = new TextDecoder();
// TODO(bartlomieju): remove these funtions
// Stuff for workers
@ -39,62 +35,46 @@ export function postMessage(data: any): void {
sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray);
}
export async function getMessage(): Promise<any> {
log("getMessage");
const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE);
if (res.data != null) {
const dataIntArray = new Uint8Array(res.data);
const dataJson = decoder.decode(dataIntArray);
return JSON.parse(dataJson);
} else {
return null;
}
}
let isClosing = false;
let hasBootstrapped = false;
export function close(): void {
if (isClosing) {
return;
}
isClosing = true;
sendSync(dispatch.OP_WORKER_CLOSE);
}
export async function runWorkerMessageLoop(): Promise<void> {
while (!isClosing) {
const data = await getMessage();
if (data == null) {
log("runWorkerMessageLoop got null message. quitting.");
break;
}
export async function workerMessageRecvCallback(data: string): Promise<void> {
let result: void | Promise<void>;
const event = { data };
let result: void | Promise<void>;
const event = { data };
try {
if (!globalThis["onmessage"]) {
break;
}
try {
//
if (globalThis["onmessage"]) {
result = globalThis.onmessage!(event);
if (result && "then" in result) {
await result;
}
if (!globalThis["onmessage"]) {
break;
}
} catch (e) {
if (globalThis["onerror"]) {
const result = globalThis.onerror(
e.message,
e.fileName,
e.lineNumber,
e.columnNumber,
e
);
if (result === true) {
continue;
}
}
throw e;
}
// TODO: run the rest of liteners
} catch (e) {
if (globalThis["onerror"]) {
const result = globalThis.onerror(
e.message,
e.fileName,
e.lineNumber,
e.columnNumber,
e
);
if (result === true) {
return;
}
}
throw e;
}
}
@ -102,8 +82,10 @@ export const workerRuntimeGlobalProperties = {
self: readOnly(globalThis),
onmessage: writable(onmessage),
onerror: writable(onerror),
// TODO: should be readonly?
close: nonEnumerable(close),
postMessage: writable(postMessage)
postMessage: writable(postMessage),
workerMessageRecvCallback: nonEnumerable(workerMessageRecvCallback)
};
/**

View file

@ -59,6 +59,7 @@ import "./write_file_test.ts";
import "./performance_test.ts";
import "./permissions_test.ts";
import "./version_test.ts";
import "./workers_test.ts";
import { runIfMain } from "../../std/testing/mod.ts";

View file

@ -38,19 +38,23 @@ function createWorker(
});
}
function hostTerminateWorker(id: number): void {
sendSync(dispatch.OP_HOST_TERMINATE_WORKER, { id });
}
function hostPostMessage(id: number, data: any): void {
const dataIntArray = encodeMessage(data);
sendSync(dispatch.OP_HOST_POST_MESSAGE, { id }, dataIntArray);
}
async function hostGetMessage(id: number): Promise<any> {
const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id });
interface WorkerEvent {
event: "error" | "msg" | "close";
data?: any;
error?: any;
}
if (res.data != null) {
return decodeMessage(new Uint8Array(res.data));
} else {
return null;
}
async function hostGetMessage(id: number): Promise<any> {
return await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id });
}
export interface Worker {
@ -72,6 +76,8 @@ export class WorkerImpl extends EventTarget implements Worker {
public onerror?: (e: any) => void;
public onmessage?: (data: any) => void;
public onmessageerror?: () => void;
private name: string;
private terminated = false;
constructor(specifier: string, options?: WorkerOptions) {
super();
@ -88,6 +94,7 @@ export class WorkerImpl extends EventTarget implements Worker {
);
}
this.name = options?.name ?? "unknown";
const hasSourceCode = false;
const sourceCode = new Uint8Array();
@ -139,42 +146,53 @@ export class WorkerImpl extends EventTarget implements Worker {
}
async poll(): Promise<void> {
while (!this.isClosing) {
const data = await hostGetMessage(this.id);
if (data == null) {
log("worker got null message. quitting.");
break;
}
if (this.onmessage) {
const event = { data };
this.onmessage(event);
}
}
while (!this.terminated) {
const event = await hostGetMessage(this.id);
/*
while (true) {
const result = await hostPollWorker(this.id);
// If terminate was called then we ignore all messages
if (this.terminated) {
return;
}
if (result.error) {
if (!this.handleError(result.error)) {
throw Error(result.error.message);
} else {
hostResumeWorker(this.id);
const type = event.type;
if (type === "msg") {
if (this.onmessage) {
const message = decodeMessage(new Uint8Array(event.data));
this.onmessage({ data: message });
}
} else {
this.isClosing = true;
hostCloseWorker(this.id);
break;
continue;
}
if (type === "error") {
if (!this.handleError(event.error)) {
throw Error(event.error.message);
}
continue;
}
if (type === "close") {
log(`Host got "close" message from worker: ${this.name}`);
this.terminated = true;
return;
}
throw new Error(`Unknown worker event: "${type}"`);
}
*/
}
postMessage(data: any): void {
if (this.terminated) {
return;
}
hostPostMessage(this.id, data);
}
terminate(): void {
throw new Error("Not yet implemented");
if (!this.terminated) {
this.terminated = true;
hostTerminateWorker(this.id);
}
}
}

84
cli/js/workers_test.ts Normal file
View file

@ -0,0 +1,84 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { test, assert, assertEquals } from "./test_util.ts";
export interface ResolvableMethods<T> {
resolve: (value?: T | PromiseLike<T>) => void;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
reject: (reason?: any) => void;
}
export type Resolvable<T> = Promise<T> & ResolvableMethods<T>;
export function createResolvable<T>(): Resolvable<T> {
let methods: ResolvableMethods<T>;
const promise = new Promise<T>((resolve, reject): void => {
methods = { resolve, reject };
});
// TypeScript doesn't know that the Promise callback occurs synchronously
// therefore use of not null assertion (`!`)
return Object.assign(promise, methods!) as Resolvable<T>;
}
test(async function workersBasic(): Promise<void> {
const promise = createResolvable();
const jsWorker = new Worker("../tests/subdir/test_worker.js", {
type: "module",
name: "jsWorker"
});
const tsWorker = new Worker("../tests/subdir/test_worker.ts", {
type: "module",
name: "tsWorker"
});
tsWorker.onmessage = (e): void => {
assertEquals(e.data, "Hello World");
promise.resolve();
};
jsWorker.onmessage = (e): void => {
assertEquals(e.data, "Hello World");
tsWorker.postMessage("Hello World");
};
jsWorker.onerror = (e: Event): void => {
e.preventDefault();
jsWorker.postMessage("Hello World");
};
jsWorker.postMessage("Hello World");
await promise;
});
test(async function nestedWorker(): Promise<void> {
const promise = createResolvable();
const nestedWorker = new Worker("../tests/subdir/nested_worker.js", {
type: "module",
name: "nested"
});
nestedWorker.onmessage = (e): void => {
assert(e.data.type !== "error");
promise.resolve();
};
nestedWorker.postMessage("Hello World");
await promise;
});
test(async function workerThrowsWhenExecuting(): Promise<void> {
const promise = createResolvable();
const throwingWorker = new Worker("../tests/subdir/throwing_worker.js", {
type: "module"
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
throwingWorker.onerror = (e: any): void => {
e.preventDefault();
assertEquals(e.message, "Uncaught Error: Thrown error");
promise.resolve();
};
await promise;
});

View file

@ -8,6 +8,7 @@ use crate::version;
use crate::DenoSubcommand;
use deno_core::*;
use std::env;
use std::sync::atomic::Ordering;
/// BUILD_OS and BUILD_ARCH match the values in Deno.build. See js/build.ts.
#[cfg(target_os = "macos")]
@ -21,6 +22,7 @@ static BUILD_ARCH: &str = "x64";
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("start", s.core_op(json_op(s.stateful_op(op_start))));
i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics))));
}
fn op_start(
@ -47,3 +49,20 @@ fn op_start(
"arch": BUILD_ARCH,
})))
}
fn op_metrics(
state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let state = state.borrow();
let m = &state.metrics;
Ok(JsonOp::Sync(json!({
"opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64,
"opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64,
"bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64,
"bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64,
"bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64
})))
}

View file

@ -1,65 +1,65 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value};
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::state::State;
use crate::worker::WorkerEvent;
use deno_core::*;
use futures;
use futures::future::FutureExt;
use futures::channel::mpsc;
use futures::sink::SinkExt;
use std;
use std::convert::From;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op(
"worker_post_message",
s.core_op(json_op(s.stateful_op(op_worker_post_message))),
);
i.register_op(
"worker_get_message",
s.core_op(json_op(s.stateful_op(op_worker_get_message))),
);
pub fn web_worker_op<D>(
sender: mpsc::Sender<WorkerEvent>,
dispatcher: D,
) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox>
where
D: Fn(
&mpsc::Sender<WorkerEvent>,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox>,
{
move |args: Value, zero_copy: Option<ZeroCopyBuf>| -> Result<JsonOp, ErrBox> {
dispatcher(&sender, args, zero_copy)
}
}
/// Get message from host as guest worker
fn op_worker_get_message(
state: &State,
_args: Value,
_data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let state_ = state.clone();
let op = async move {
let fut = {
let state = state_.borrow();
state
.worker_channels_internal
.as_ref()
.unwrap()
.get_message()
};
let maybe_buf = fut.await;
debug!("op_worker_get_message");
Ok(json!({ "data": maybe_buf }))
};
Ok(JsonOp::Async(op.boxed_local()))
pub fn init(i: &mut Isolate, s: &State, sender: &mpsc::Sender<WorkerEvent>) {
i.register_op(
"worker_post_message",
s.core_op(json_op(web_worker_op(
sender.clone(),
op_worker_post_message,
))),
);
i.register_op(
"worker_close",
s.core_op(json_op(web_worker_op(sender.clone(), op_worker_close))),
);
}
/// Post message to host as guest worker
fn op_worker_post_message(
state: &State,
sender: &mpsc::Sender<WorkerEvent>,
_args: Value,
data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let state = state.borrow();
let fut = state
.worker_channels_internal
.as_ref()
.unwrap()
.post_message(d);
futures::executor::block_on(fut)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
let mut sender = sender.clone();
let fut = sender.send(WorkerEvent::Message(d));
futures::executor::block_on(fut).expect("Failed to post message to host");
Ok(JsonOp::Sync(json!({})))
}
/// Notify host that guest worker closes
fn op_worker_close(
sender: &mpsc::Sender<WorkerEvent>,
_args: Value,
_data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let mut sender = sender.clone();
sender.close_channel();
Ok(JsonOp::Sync(json!({})))
}

View file

@ -1,21 +1,29 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::deno_error::bad_resource;
use crate::deno_error::js_check;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::deno_error::GetErrorKind;
use crate::fmt_errors::JSError;
use crate::futures::SinkExt;
use crate::global_state::GlobalState;
use crate::ops::json_op;
use crate::permissions::DenoPermissions;
use crate::startup_data;
use crate::state::State;
use crate::tokio_util::create_basic_runtime;
use crate::web_worker::WebWorker;
use crate::worker::WorkerChannelsExternal;
use crate::worker::Worker;
use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle;
use deno_core::*;
use futures;
use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::stream::StreamExt;
use std;
use std::convert::From;
use std::sync::atomic::Ordering;
use std::task::Poll;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op(
@ -23,8 +31,8 @@ pub fn init(i: &mut Isolate, s: &State) {
s.core_op(json_op(s.stateful_op(op_create_worker))),
);
i.register_op(
"host_close_worker",
s.core_op(json_op(s.stateful_op(op_host_close_worker))),
"host_terminate_worker",
s.core_op(json_op(s.stateful_op(op_host_terminate_worker))),
);
i.register_op(
"host_post_message",
@ -34,7 +42,159 @@ pub fn init(i: &mut Isolate, s: &State) {
"host_get_message",
s.core_op(json_op(s.stateful_op(op_host_get_message))),
);
i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics))));
}
fn create_web_worker(
name: String,
global_state: GlobalState,
permissions: DenoPermissions,
specifier: ModuleSpecifier,
) -> Result<WebWorker, ErrBox> {
let state =
State::new_for_worker(global_state, Some(permissions), specifier)?;
let mut worker =
WebWorker::new(name.to_string(), startup_data::deno_isolate_init(), state);
let script = format!("bootstrapWorkerRuntime(\"{}\")", name);
worker.execute(&script)?;
Ok(worker)
}
// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs`
pub fn run_worker_loop(
rt: &mut tokio::runtime::Runtime,
worker: &mut Worker,
) -> Result<(), ErrBox> {
let mut worker_is_ready = false;
let fut = poll_fn(|cx| -> Poll<Result<(), ErrBox>> {
if !worker_is_ready {
match worker.poll_unpin(cx) {
Poll::Ready(r) => {
if let Err(e) = r {
let mut sender = worker.internal_channels.sender.clone();
futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
.expect("Failed to post message to host");
}
worker_is_ready = true;
}
Poll::Pending => {}
}
}
let maybe_msg = {
match worker.internal_channels.receiver.poll_next_unpin(cx) {
Poll::Ready(r) => match r {
Some(msg) => {
let msg_str = String::from_utf8(msg.to_vec()).unwrap();
debug!("received message from host: {}", msg_str);
Some(msg_str)
}
None => {
debug!("channel closed by host, worker event loop shuts down");
return Poll::Ready(Ok(()));
}
},
Poll::Pending => None,
}
};
if let Some(msg) = maybe_msg {
// TODO: just add second value and then bind using rusty_v8
// to get structured clone/transfer working
let script = format!("workerMessageRecvCallback({})", msg);
worker
.execute(&script)
.expect("Failed to execute message cb");
// Let worker be polled again
worker_is_ready = false;
worker.waker.wake();
}
Poll::Pending
});
rt.block_on(fut)
}
// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs`
// TODO(bartlomieju): check if order of actions is aligned to Worker spec
fn run_worker_thread(
name: String,
global_state: GlobalState,
permissions: DenoPermissions,
specifier: ModuleSpecifier,
has_source_code: bool,
source_code: String,
) -> Result<WorkerHandle, ErrBox> {
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
let builder =
std::thread::Builder::new().name(format!("deno-worker-{}", name));
// TODO(bartlomieju): store JoinHandle as well
builder.spawn(move || {
// Any error inside this block is terminal:
// - JS worker is useless - meaning it throws an exception and can't do anything else,
// all action done upon it should be noops
// - newly spawned thread exits
let result =
create_web_worker(name, global_state, permissions, specifier.clone());
if let Err(err) = result {
handle_sender.send(Err(err)).unwrap();
return;
}
let mut worker = result.unwrap();
// Send thread safe handle to newly created worker to host thread
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
drop(handle_sender);
// At this point the only method of communication with host
// is using `worker.internal_channels`.
//
// Host can already push messages and interact with worker.
//
// Next steps:
// - create tokio runtime
// - load provided module or code
// - start driving worker's event loop
let mut rt = create_basic_runtime();
// TODO: run with using select with terminate
// Execute provided source code immediately
let result = if has_source_code {
worker.execute(&source_code)
} else {
// TODO(bartlomieju): add "type": "classic", ie. ability to load
// script instead of module
let load_future = worker
.execute_mod_async(&specifier, None, false)
.boxed_local();
rt.block_on(load_future)
};
if let Err(e) = result {
let mut sender = worker.internal_channels.sender.clone();
futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
.expect("Failed to post message to host");
// Failure to execute script is a terminal error, bye, bye.
return;
}
// TODO(bartlomieju): this thread should return result of event loop
// that means that we should store JoinHandle to thread to ensure
// that it actually terminates.
run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
})?;
handle_receiver.recv().unwrap()
}
#[derive(Deserialize)]
@ -61,72 +221,28 @@ fn op_create_worker(
let parent_state = state.clone();
let state = state.borrow();
let global_state = state.global_state.clone();
let child_permissions = state.permissions.clone();
let permissions = state.permissions.clone();
let referrer = state.main_module.to_string();
drop(state);
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<Result<WorkerChannelsExternal, ErrBox>>(1);
// TODO(bartlomieju): Isn't this wrong?
let result = ModuleSpecifier::resolve_url_or_path(&specifier)?;
let module_specifier = if !has_source_code {
ModuleSpecifier::resolve_import(&specifier, &referrer)?
} else {
result
};
std::thread::spawn(move || {
let result = State::new_for_worker(
global_state,
Some(child_permissions), // by default share with parent
module_specifier.clone(),
);
if let Err(err) = result {
handle_sender.send(Err(err)).unwrap();
return;
}
let child_state = result.unwrap();
let worker_name = args_name.unwrap_or_else(|| {
// TODO(bartlomieju): change it to something more descriptive
format!("USER-WORKER-{}", specifier)
});
// TODO: add a new option to make child worker not sharing permissions
// with parent (aka .clone(), requests from child won't reflect in parent)
let mut worker = WebWorker::new(
worker_name.to_string(),
startup_data::deno_isolate_init(),
child_state,
);
let script = format!("bootstrapWorkerRuntime(\"{}\")", worker_name);
js_check(worker.execute(&script));
js_check(worker.execute("runWorkerMessageLoop()"));
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
// Has provided source code, execute immediately.
if has_source_code {
js_check(worker.execute(&source_code));
// FIXME(bartlomieju): runtime is not run in this case
return;
}
let fut = async move {
let r = worker
.execute_mod_async(&module_specifier, None, false)
.await;
if r.is_ok() {
let _ = (&mut *worker).await;
}
}
.boxed_local();
crate::tokio_util::run_basic(fut);
let module_specifier =
ModuleSpecifier::resolve_import(&specifier, &referrer)?;
let worker_name = args_name.unwrap_or_else(|| {
// TODO(bartlomieju): change it to something more descriptive
format!("USER-WORKER-{}", specifier)
});
let handle = handle_receiver.recv().unwrap()?;
let worker_id = parent_state.add_child_worker(handle);
let worker_handle = run_worker_thread(
worker_name,
global_state,
permissions,
module_specifier,
has_source_code,
source_code,
)?;
// At this point all interactions with worker happen using thread
// safe handler returned from previous function call
let worker_id = parent_state.add_child_worker(worker_handle);
Ok(JsonOp::Sync(json!({ "id": worker_id })))
}
@ -136,7 +252,7 @@ struct WorkerArgs {
id: i32,
}
fn op_host_close_worker(
fn op_host_terminate_worker(
state: &State,
args: Value,
_data: Option<ZeroCopyBuf>,
@ -144,23 +260,37 @@ fn op_host_close_worker(
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let mut state = state.borrow_mut();
let maybe_worker_handle = state.workers.remove(&id);
if let Some(worker_handle) = maybe_worker_handle {
let mut sender = worker_handle.sender.clone();
sender.close_channel();
let mut receiver =
futures::executor::block_on(worker_handle.receiver.lock());
receiver.close();
};
let worker_handle =
state.workers.remove(&id).expect("No worker handle found");
worker_handle.terminate();
Ok(JsonOp::Sync(json!({})))
}
#[derive(Deserialize)]
struct HostGetMessageArgs {
id: i32,
fn serialize_worker_event(event: WorkerEvent) -> Value {
match event {
WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }),
WorkerEvent::Error(error) => match error.kind() {
ErrorKind::JSError => {
let error = error.downcast::<JSError>().unwrap();
let exception: V8Exception = error.into();
json!({
"type": "error",
"error": {
"message": exception.message,
"fileName": exception.script_resource_name,
"lineNumber": exception.line_number,
"columnNumber": exception.start_column,
}
})
}
_ => json!({
"type": "error",
"error": {
"message": error.to_string(),
}
}),
},
}
}
/// Get message from guest worker as host
@ -169,59 +299,48 @@ fn op_host_get_message(
args: Value,
_data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let args: HostGetMessageArgs = serde_json::from_value(args)?;
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let state = state.borrow();
// TODO: don't return bad resource anymore
let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?;
let fut = worker_handle.get_message();
let state_ = state.borrow();
let worker_handle = state_
.workers
.get(&id)
.expect("No worker handle found")
.clone();
let state_ = state.clone();
let op = async move {
let maybe_buf = fut.await;
Ok(json!({ "data": maybe_buf }))
let response = match worker_handle.get_event().await {
Some(event) => serialize_worker_event(event),
None => {
let mut state_ = state_.borrow_mut();
let mut handle =
state_.workers.remove(&id).expect("No worker handle found");
handle.sender.close_channel();
// TODO(bartlomieju): join thread handle here
json!({ "type": "close" })
}
};
Ok(response)
};
Ok(JsonOp::Async(op.boxed_local()))
}
#[derive(Deserialize)]
struct HostPostMessageArgs {
id: i32,
}
/// Post message to guest worker as host
fn op_host_post_message(
state: &State,
args: Value,
data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let args: HostPostMessageArgs = serde_json::from_value(args)?;
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
debug!("post message to worker {}", id);
let state = state.borrow();
// TODO: don't return bad resource anymore
let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?;
let worker_handle = state.workers.get(&id).expect("No worker handle found");
let fut = worker_handle
.post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()));
futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(json!({})))
}
fn op_metrics(
state: &State,
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let state = state.borrow();
let m = &state.metrics;
Ok(JsonOp::Sync(json!({
"opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64,
"opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64,
"bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64,
"bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64,
"bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64
})))
}

View file

@ -8,8 +8,7 @@ use crate::metrics::Metrics;
use crate::ops::JsonOp;
use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions;
use crate::worker::WorkerChannelsExternal;
use crate::worker::WorkerChannelsInternal;
use crate::worker::WorkerHandle;
use deno_core::Buf;
use deno_core::CoreOp;
use deno_core::ErrBox;
@ -55,8 +54,7 @@ pub struct StateInner {
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: GlobalTimer,
pub workers: HashMap<u32, WorkerChannelsExternal>,
pub worker_channels_internal: Option<WorkerChannelsInternal>,
pub workers: HashMap<u32, WorkerHandle>,
pub next_worker_id: AtomicUsize,
pub start_time: Instant,
pub seeded_rng: Option<StdRng>,
@ -232,7 +230,6 @@ impl State {
import_map,
metrics: Metrics::default(),
global_timer: GlobalTimer::new(),
worker_channels_internal: None,
workers: HashMap::new(),
next_worker_id: AtomicUsize::new(0),
start_time: Instant::now(),
@ -269,7 +266,6 @@ impl State {
import_map: None,
metrics: Metrics::default(),
global_timer: GlobalTimer::new(),
worker_channels_internal: None,
workers: HashMap::new(),
next_worker_id: AtomicUsize::new(0),
start_time: Instant::now(),
@ -282,7 +278,7 @@ impl State {
Ok(Self(state))
}
pub fn add_child_worker(&self, handle: WorkerChannelsExternal) -> u32 {
pub fn add_child_worker(&self, handle: WorkerHandle) -> u32 {
let mut inner_state = self.borrow_mut();
let worker_id =
inner_state.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32;

View file

@ -397,12 +397,10 @@ itest!(_026_redirect_javascript {
http_server: true,
});
/* TODO(ry) Disabled to get #3844 landed faster. Re-enable.
itest!(_026_workers {
args: "run --reload 026_workers.ts",
output: "026_workers.ts.out",
});
*/
itest!(workers_basic {
args: "run --reload workers_basic.ts",

View file

@ -0,0 +1,19 @@
// Specifier should be resolved relative to current file
const jsWorker = new Worker("./sibling_worker.js", {
type: "module",
name: "sibling"
});
jsWorker.onerror = _e => {
postMessage({ type: "error" });
};
jsWorker.onmessage = e => {
console.log("js worker on message");
postMessage({ type: "msg", text: e });
close();
};
onmessage = function(e) {
jsWorker.postMessage(e.data);
};

View file

@ -0,0 +1,4 @@
onmessage = e => {
postMessage(e.data);
close();
};

View file

@ -1,6 +1,5 @@
let thrown = false;
// TODO(bartlomieju): add test for throwing in web worker
if (self.name !== "jsWorker") {
throw Error(`Bad worker name: ${self.name}, expected jsWorker`);
}
@ -14,7 +13,6 @@ onmessage = function(e) {
}
postMessage(e.data);
close();
};

View file

@ -4,8 +4,6 @@ if (self.name !== "tsWorker") {
onmessage = function(e): void {
console.log(e.data);
postMessage(e.data);
close();
};

View file

@ -0,0 +1,2 @@
// This worker just throws error when it's being executed
throw Error("Thrown error");

View file

@ -1,30 +1,19 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
pub fn create_basic_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new()
.basic_scheduler()
.enable_io()
.enable_time()
.build()
.unwrap()
}
// TODO(ry) rename to run_local ?
pub fn run_basic<F, R>(future: F) -> R
where
F: std::future::Future<Output = R> + 'static,
{
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.enable_io()
.enable_time()
.build()
.unwrap();
let mut rt = create_basic_runtime();
rt.block_on(future)
}
// TODO(ry) maybe replace with tokio::task::spawn_blocking
#[cfg(test)]
pub fn spawn_thread<F, R>(f: F) -> impl std::future::Future<Output = R>
where
F: 'static + Send + FnOnce() -> R,
R: 'static + Send,
{
let (sender, receiver) = tokio::sync::oneshot::channel::<R>();
std::thread::spawn(move || {
let result = f();
sender.send(result)
});
async { receiver.await.unwrap() }
}

View file

@ -29,7 +29,7 @@ impl WebWorker {
{
let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state);
ops::web_worker::init(isolate, &state);
ops::web_worker::init(isolate, &state, &worker.internal_channels.sender);
ops::worker_host::init(isolate, &state);
ops::errors::init(isolate, &state);
ops::timers::init(isolate, &state);
@ -65,9 +65,12 @@ impl Future for WebWorker {
#[cfg(test)]
mod tests {
use super::*;
use crate::ops::worker_host::run_worker_loop;
use crate::startup_data;
use crate::state::State;
use crate::tokio_util;
use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle;
fn create_test_worker() -> WebWorker {
let state = State::mock("./hello.js");
@ -77,77 +80,95 @@ mod tests {
state,
);
worker.execute("bootstrapWorkerRuntime(\"TEST\")").unwrap();
worker.execute("runWorkerMessageLoop()").unwrap();
worker
}
#[test]
fn test_worker_messages() {
let mut worker = create_test_worker();
let source = r#"
onmessage = function(e) {
console.log("msg from main script", e.data);
if (e.data == "exit") {
delete self.onmessage;
return;
} else {
console.assert(e.data === "hi");
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<WorkerHandle>(1);
let join_handle = std::thread::spawn(move || {
let mut worker = create_test_worker();
let source = r#"
onmessage = function(e) {
console.log("msg from main script", e.data);
if (e.data == "exit") {
return close();
} else {
console.assert(e.data === "hi");
}
postMessage([1, 2, 3]);
console.log("after postMessage");
}
postMessage([1, 2, 3]);
console.log("after postMessage");
}
"#;
worker.execute(source).unwrap();
let handle = worker.thread_safe_handle();
let _ = tokio_util::spawn_thread(move || {
tokio_util::run_basic(async move {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = handle.post_message(msg.clone()).await;
assert!(r.is_ok());
let maybe_msg = handle.get_message().await;
assert!(maybe_msg.is_some());
let r = handle.post_message(msg.clone()).await;
assert!(r.is_ok());
let maybe_msg = handle.get_message().await;
assert!(maybe_msg.is_some());
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
let msg = json!("exit")
.to_string()
.into_boxed_str()
.into_boxed_bytes();
let r = handle.post_message(msg).await;
assert!(r.is_ok());
})
"#;
worker.execute(source).unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
let mut rt = tokio_util::create_basic_runtime();
let r = run_worker_loop(&mut rt, &mut worker);
assert!(r.is_ok())
});
let r = tokio_util::run_basic(worker);
assert!(r.is_ok())
let mut handle = handle_receiver.recv().unwrap();
tokio_util::run_basic(async move {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = handle.post_message(msg.clone()).await;
assert!(r.is_ok());
let maybe_msg = handle.get_event().await;
assert!(maybe_msg.is_some());
let r = handle.post_message(msg.clone()).await;
assert!(r.is_ok());
let maybe_msg = handle.get_event().await;
assert!(maybe_msg.is_some());
match maybe_msg {
Some(WorkerEvent::Message(buf)) => {
assert_eq!(*buf, *b"[1,2,3]");
}
_ => unreachable!(),
}
let msg = json!("exit")
.to_string()
.into_boxed_str()
.into_boxed_bytes();
let r = handle.post_message(msg).await;
assert!(r.is_ok());
let event = handle.get_event().await;
assert!(event.is_none());
handle.sender.close_channel();
});
join_handle.join().expect("Failed to join worker thread");
}
#[test]
fn removed_from_resource_table_on_close() {
let mut worker = create_test_worker();
let handle = worker.thread_safe_handle();
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<WorkerHandle>(1);
worker
.execute("onmessage = () => { delete self.onmessage; }")
.unwrap();
let worker_post_message_fut = tokio_util::spawn_thread(move || {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = futures::executor::block_on(handle.post_message(msg));
assert!(r.is_ok());
let join_handle = std::thread::spawn(move || {
let mut worker = create_test_worker();
worker.execute("onmessage = () => { close(); }").unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
let mut rt = tokio_util::create_basic_runtime();
let r = run_worker_loop(&mut rt, &mut worker);
assert!(r.is_ok())
});
let mut handle = handle_receiver.recv().unwrap();
tokio_util::run_basic(async move {
worker_post_message_fut.await;
let r = worker.await;
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = handle.post_message(msg.clone()).await;
assert!(r.is_ok());
let event = handle.get_event().await;
assert!(event.is_none());
handle.sender.close_channel();
});
join_handle.join().expect("Failed to join worker thread");
}
}

View file

@ -24,76 +24,55 @@ use std::task::Poll;
use tokio::sync::Mutex as AsyncMutex;
use url::Url;
/// Wraps mpsc channels so they can be referenced
/// from ops and used to facilitate parent-child communication
/// for workers.
#[derive(Clone)]
pub struct WorkerChannels {
pub sender: mpsc::Sender<Buf>,
pub receiver: Arc<AsyncMutex<mpsc::Receiver<Buf>>>,
/// Events that are sent to host from child
/// worker.
pub enum WorkerEvent {
Message(Buf),
Error(ErrBox),
}
impl WorkerChannels {
pub struct WorkerChannelsInternal {
pub sender: mpsc::Sender<WorkerEvent>,
pub receiver: mpsc::Receiver<Buf>,
}
#[derive(Clone)]
pub struct WorkerHandle {
pub sender: mpsc::Sender<Buf>,
pub receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>,
// terminate_channel
}
impl WorkerHandle {
pub fn terminate(&self) {
todo!()
}
/// Post message to worker as a host.
pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> {
let mut sender = self.sender.clone();
sender.send(buf).map_err(ErrBox::from).await
}
/// Get message from worker as a host.
pub fn get_message(&self) -> Pin<Box<dyn Future<Output = Option<Buf>>>> {
let receiver_mutex = self.receiver.clone();
async move {
let mut receiver = receiver_mutex.lock().await;
receiver.next().await
}
.boxed_local()
// TODO: should use `try_lock` and return error if
// more than one listener tries to get event
pub async fn get_event(&self) -> Option<WorkerEvent> {
let mut receiver = self.receiver.lock().await;
receiver.next().await
}
}
pub struct WorkerChannelsInternal(WorkerChannels);
impl Deref for WorkerChannelsInternal {
type Target = WorkerChannels;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for WorkerChannelsInternal {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Clone)]
pub struct WorkerChannelsExternal(WorkerChannels);
impl Deref for WorkerChannelsExternal {
type Target = WorkerChannels;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for WorkerChannelsExternal {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
fn create_channels() -> (WorkerChannelsInternal, WorkerChannelsExternal) {
fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) {
let (in_tx, in_rx) = mpsc::channel::<Buf>(1);
let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
let internal_channels = WorkerChannelsInternal(WorkerChannels {
let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1);
let internal_channels = WorkerChannelsInternal {
sender: out_tx,
receiver: Arc::new(AsyncMutex::new(in_rx)),
});
let external_channels = WorkerChannelsExternal(WorkerChannels {
receiver: in_rx,
};
let external_channels = WorkerHandle {
sender: in_tx,
receiver: Arc::new(AsyncMutex::new(out_rx)),
});
};
(internal_channels, external_channels)
}
@ -113,7 +92,9 @@ pub struct Worker {
pub name: String,
pub isolate: Box<deno_core::EsIsolate>,
pub state: State,
external_channels: WorkerChannelsExternal,
pub waker: AtomicWaker,
pub(crate) internal_channels: WorkerChannelsInternal,
external_channels: WorkerHandle,
}
impl Worker {
@ -127,15 +108,13 @@ impl Worker {
});
let (internal_channels, external_channels) = create_channels();
{
let mut state = state.borrow_mut();
state.worker_channels_internal = Some(internal_channels);
}
Self {
name,
isolate,
state,
waker: AtomicWaker::new(),
internal_channels,
external_channels,
}
}
@ -174,7 +153,7 @@ impl Worker {
}
/// Returns a way to communicate with the Worker from other threads.
pub fn thread_safe_handle(&self) -> WorkerChannelsExternal {
pub fn thread_safe_handle(&self) -> WorkerHandle {
self.external_channels.clone()
}
}
@ -184,8 +163,7 @@ impl Future for Worker {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let waker = AtomicWaker::new();
waker.register(cx.waker());
inner.waker.register(cx.waker());
inner.isolate.poll_unpin(cx)
}
}
@ -224,7 +202,7 @@ impl MainWorker {
ops::signal::init(isolate, &state);
ops::timers::init(isolate, &state);
ops::worker_host::init(isolate, &state);
ops::web_worker::init(isolate, &state);
ops::web_worker::init(isolate, &state, &worker.internal_channels.sender);
}
Self(worker)
}

View file

@ -235,6 +235,51 @@ impl EsIsolate {
}
}
/// TODO(bartlomieju): copy-pasta to avoid problem with global handle attached
/// to ErrBox
pub fn mod_evaluate_dyn_import(
&mut self,
id: ModuleId,
) -> Result<(), ErrBox> {
let isolate = self.core_isolate.v8_isolate.as_ref().unwrap();
let mut locker = v8::Locker::new(isolate);
let mut hs = v8::HandleScope::new(locker.enter());
let scope = hs.enter();
assert!(!self.core_isolate.global_context.is_empty());
let context = self.core_isolate.global_context.get(scope).unwrap();
let mut cs = v8::ContextScope::new(scope, context);
let scope = cs.enter();
let info = self.modules.get_info(id).expect("ModuleInfo not found");
let mut module = info.handle.get(scope).expect("Empty module handle");
let mut status = module.get_status();
if status == v8::ModuleStatus::Instantiated {
let ok = module.evaluate(scope, context).is_some();
// Update status after evaluating.
status = module.get_status();
if ok {
assert!(
status == v8::ModuleStatus::Evaluated
|| status == v8::ModuleStatus::Errored
);
} else {
assert!(status == v8::ModuleStatus::Errored);
}
}
match status {
v8::ModuleStatus::Evaluated => Ok(()),
v8::ModuleStatus::Errored => {
let i = &mut self.core_isolate;
let exception = module.get_exception();
i.exception_to_err_result(scope, exception)
.map_err(|err| i.attach_handle_to_error(scope, err, exception))
}
other => panic!("Unexpected module status {:?}", other),
}
}
/// Evaluates an already instantiated ES module.
///
/// ErrBox can be downcast to a type that exposes additional information about
@ -274,7 +319,6 @@ impl EsIsolate {
let i = &mut self.core_isolate;
let exception = module.get_exception();
i.exception_to_err_result(scope, exception)
.map_err(|err| i.attach_handle_to_error(scope, err, exception))
}
other => panic!("Unexpected module status {:?}", other),
}
@ -425,7 +469,7 @@ impl EsIsolate {
// Load is done.
let module_id = load.root_module_id.unwrap();
self.mod_instantiate(module_id)?;
match self.mod_evaluate(module_id) {
match self.mod_evaluate_dyn_import(module_id) {
Ok(()) => self.dyn_import_done(dyn_import_id, module_id)?,
Err(err) => self.dyn_import_error(dyn_import_id, err)?,
};