1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-28 16:20:57 -05:00

feat(runtime/worker): Structured cloning worker message passing (#9323)

This commit upgrade "Worker.postMessage()" implementation to use 
structured clone algorithm instead of non-spec compliant JSON serialization.
This commit is contained in:
Tim Ramlot 2021-05-11 21:09:09 +02:00 committed by GitHub
parent 0d319161bc
commit 635253bd3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 457 additions and 404 deletions

View file

@ -75,7 +75,7 @@ const EXEC_TIME_BENCHMARKS: &[(&str, &[&str], Option<i32>)] = &[
&[
"run",
"--allow-read",
"cli/tests/workers_large_message_bench.ts",
"cli/tests/workers/bench_large_message.ts",
],
None,
),

View file

@ -1,14 +1,10 @@
// Copyright 2020 the Deno authors. All rights reserved. MIT license.
// deno-lint-ignore-file
import { deferred } from "../../test_util/std/async/deferred.ts";
function oneWorker(i: any): Promise<void> {
function oneWorker(i: number) {
return new Promise<void>((resolve) => {
let countDown = 10;
const worker = new Worker(
new URL("workers/large_message_worker.js", import.meta.url).href,
new URL("worker_large_message.js", import.meta.url).href,
{ type: "module" },
);
worker.onmessage = (e): void => {
@ -23,8 +19,8 @@ function oneWorker(i: any): Promise<void> {
});
}
function bench(): Promise<any> {
let promises = [];
function bench() {
const promises = [];
for (let i = 0; i < 50; i++) {
promises.push(oneWorker(i));
}

View file

@ -1,21 +1,25 @@
// See issue for details
// https://github.com/denoland/deno/issues/4080
//
// After first call to `postMessage() this worker schedules
// [close(), postMessage()] ops on the same turn of microtask queue
// (because message is rather big).
// Only single `postMessage()` call should make it
// to host, ie. after calling `close()` no more code should be run.
// After first received message, this worker schedules
// [assert(), close(), assert()] ops on the same turn of microtask queue
// All tasks after close should not make it
onmessage = async function () {
let stage = 0;
await new Promise((_) => {
setTimeout(() => {
if (stage !== 0) throw "Unexpected stage";
stage = 1;
}, 50);
setTimeout(() => {
if (stage !== 1) throw "Unexpected stage";
stage = 2;
postMessage("DONE");
close();
}, 50);
while (true) {
await new Promise((done) => {
setTimeout(() => {
postMessage({ buf: new Array(999999) });
done();
throw "This should not be run";
}, 50);
});
});
}
};

View file

@ -198,15 +198,12 @@ Deno.test({
);
racyWorker.onmessage = (e): void => {
assertEquals(e.data.buf.length, 999999);
racyWorker.onmessage = (_e): void => {
throw new Error("unreachable");
};
setTimeout(() => {
promise.resolve();
}, 100);
};
racyWorker.postMessage("START");
await promise;
},
});
@ -726,3 +723,38 @@ Deno.test({
worker.terminate();
},
});
Deno.test({
name: "structured cloning postMessage",
fn: async function (): Promise<void> {
const result = deferred();
const worker = new Worker(
new URL("worker_structured_cloning.ts", import.meta.url).href,
{ type: "module" },
);
worker.onmessage = (e): void => {
// self field should reference itself (circular ref)
const value = e.data.self.self.self;
// fields a and b refer to the same array
assertEquals(value.a, ["a", true, 432]);
assertEquals(value.a, ["a", true, 432]);
value.b[0] = "b";
value.a[2] += 5;
assertEquals(value.a, ["b", true, 437]);
assertEquals(value.b, ["b", true, 437]);
const len = value.c.size;
value.c.add(1); // This value is already in the set.
value.c.add(2);
assertEquals(len + 1, value.c.size);
result.resolve();
};
worker.postMessage("START");
await result;
worker.terminate();
},
});

View file

@ -0,0 +1,15 @@
// More info on structured cloning can be found here:
// https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
self.onmessage = () => {
const arr = ["a", true, 432];
const set = new Set([1, 3, 5, 7, 9]);
const selfReference = {
a: arr,
b: arr,
c: set,
};
// deno-lint-ignore no-explicit-any
(selfReference as any).self = selfReference;
self.postMessage(selfReference);
};

View file

@ -553,7 +553,7 @@ fn deserialize(
match value {
Some(deserialized) => rv.set(deserialized),
None => {
let msg = v8::String::new(scope, "string too long").unwrap();
let msg = v8::String::new(scope, "could not deserialize value").unwrap();
let exception = v8::Exception::range_error(scope, msg);
scope.throw_exception(exception);
}

View file

@ -39,26 +39,8 @@
return core.opAsync("op_host_get_message", id);
}
const encoder = new TextEncoder();
const decoder = new TextDecoder();
function encodeMessage(data) {
const dataJson = JSON.stringify(data);
return encoder.encode(dataJson);
}
function decodeMessage(dataIntArray) {
// Temporary solution until structured clone arrives in v8.
// Current clone is made by parsing json to byte array and from byte array back to json.
// In that case "undefined" transforms to empty byte array, but empty byte array does not transform back to undefined.
// Thats why this special is statement is needed.
if (dataIntArray.length == 0) {
return undefined;
}
const dataJson = decoder.decode(dataIntArray);
return JSON.parse(dataJson);
}
/**
* @param {string} permission
* @return {boolean}
@ -211,18 +193,7 @@
this.#poll();
}
#handleMessage = (msgData) => {
let data;
try {
data = decodeMessage(new Uint8Array(msgData));
} catch (e) {
const msgErrorEvent = new MessageEvent("messageerror", {
cancelable: false,
data,
});
return;
}
#handleMessage = (data) => {
const msgEvent = new MessageEvent("message", {
cancelable: false,
data,
@ -253,57 +224,45 @@
#poll = async () => {
while (!this.#terminated) {
const event = await hostGetMessage(this.#id);
const [type, data] = await hostGetMessage(this.#id);
// If terminate was called then we ignore all messages
if (this.#terminated) {
return;
}
const type = event.type;
if (type === "terminalError") {
switch (type) {
case 0: { // Message
const msg = core.deserialize(data);
this.#handleMessage(msg);
break;
}
case 1: { // TerminalError
this.#terminated = true;
if (!this.#handleError(event.error)) {
} /* falls through */
case 2: { // Error
if (!this.#handleError(data)) {
if (globalThis instanceof Window) {
throw new Error("Unhandled error event reached main worker.");
} else {
core.opSync(
"op_host_unhandled_error",
event.error.message,
"op_worker_unhandled_error",
data.message,
);
}
}
continue;
break;
}
if (type === "msg") {
this.#handleMessage(event.data);
continue;
}
if (type === "error") {
if (!this.#handleError(event.error)) {
if (globalThis instanceof Window) {
throw new Error("Unhandled error event reached main worker.");
} else {
core.opSync(
"op_host_unhandled_error",
event.error.message,
);
}
}
continue;
}
if (type === "close") {
case 3: { // Close
log(`Host got "close" message from worker: ${this.#name}`);
this.#terminated = true;
return;
}
default: {
throw new Error(`Unknown worker event: "${type}"`);
}
}
}
};
postMessage(message, transferOrOptions) {
@ -317,7 +276,8 @@
return;
}
hostPostMessage(this.#id, encodeMessage(message));
const bufferMsg = core.serialize(message);
hostPostMessage(this.#id, bufferMsg);
}
terminate() {

View file

@ -67,7 +67,7 @@ delete Object.prototype.__proto__;
}
isClosing = true;
opCloseWorker();
core.opSync("op_worker_close");
}
// TODO(bartlomieju): remove these functions
@ -76,24 +76,24 @@ delete Object.prototype.__proto__;
const onerror = () => {};
function postMessage(data) {
const dataJson = JSON.stringify(data);
const dataIntArray = encoder.encode(dataJson);
opPostMessage(dataIntArray);
const dataIntArray = core.serialize(data);
core.opSync("op_worker_post_message", null, dataIntArray);
}
let isClosing = false;
async function workerMessageRecvCallback(data) {
async function pollForMessages() {
while (!isClosing) {
const bufferMsg = await core.opAsync("op_worker_get_message");
const data = core.deserialize(bufferMsg);
const msgEvent = new MessageEvent("message", {
cancelable: false,
data,
});
try {
if (globalThis["onmessage"]) {
const result = globalThis.onmessage(msgEvent);
if (result && "then" in result) {
await result;
}
if (globalThis.onmessage) {
await globalThis.onmessage(msgEvent);
}
globalThis.dispatchEvent(msgEvent);
} catch (e) {
@ -125,17 +125,13 @@ delete Object.prototype.__proto__;
}
if (!handled) {
throw e;
core.opSync(
"op_worker_unhandled_error",
e.message,
);
}
}
}
function opPostMessage(data) {
core.opSync("op_worker_post_message", null, data);
}
function opCloseWorker() {
core.opSync("op_worker_close");
}
function opMainModule() {
@ -395,7 +391,6 @@ delete Object.prototype.__proto__;
// TODO(bartlomieju): should be readonly?
close: util.nonEnumerable(workerClose),
postMessage: util.writable(postMessage),
workerMessageRecvCallback: util.nonEnumerable(workerMessageRecvCallback),
};
let hasBootstrapped = false;
@ -506,6 +501,8 @@ delete Object.prototype.__proto__;
location.setLocationHref(locationHref);
registerErrors();
pollForMessages();
const internalSymbol = Symbol("Deno.internal");
const finalDenoNs = {

View file

@ -1,41 +1,85 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use crate::web_worker::WebWorkerHandle;
use crate::web_worker::WebWorkerInternalHandle;
use crate::web_worker::WorkerEvent;
use deno_core::error::generic_error;
use deno_core::error::null_opbuf;
use deno_core::futures::channel::mpsc;
use deno_core::error::AnyError;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::ZeroCopyBuf;
use std::cell::RefCell;
use std::rc::Rc;
pub fn init() -> Extension {
Extension::builder()
.ops(vec![
(
"op_worker_post_message",
op_sync(move |state, _args: (), buf: Option<ZeroCopyBuf>| {
let buf = buf.ok_or_else(null_opbuf)?;
let msg_buf: Box<[u8]> = (*buf).into();
let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
sender
.try_send(WorkerEvent::Message(msg_buf))
.expect("Failed to post message to host");
Ok(())
}),
),
("op_worker_post_message", op_sync(op_worker_post_message)),
("op_worker_get_message", op_async(op_worker_get_message)),
// Notify host that guest worker closes.
("op_worker_close", op_sync(op_worker_close)),
// Notify host that guest worker has unhandled error.
(
"op_worker_close",
op_sync(move |state, _: (), _: ()| {
// Notify parent that we're finished
let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
sender.close_channel();
// Terminate execution of current worker
let handle = state.borrow::<WebWorkerHandle>();
handle.terminate();
Ok(())
}),
"op_worker_unhandled_error",
op_sync(op_worker_unhandled_error),
),
])
.build()
}
fn op_worker_post_message(
state: &mut OpState,
_: (),
buf: Option<ZeroCopyBuf>,
) -> Result<(), AnyError> {
let buf = buf.ok_or_else(null_opbuf)?;
let handle = state.borrow::<WebWorkerInternalHandle>().clone();
handle
.post_event(WorkerEvent::Message(buf))
.expect("Failed to post message to host");
Ok(())
}
async fn op_worker_get_message(
state: Rc<RefCell<OpState>>,
_: (),
_: (),
) -> Result<ZeroCopyBuf, AnyError> {
let temp = {
let a = state.borrow();
a.borrow::<WebWorkerInternalHandle>().clone()
};
let maybe_data = temp.get_message().await;
Ok(maybe_data.unwrap_or_else(ZeroCopyBuf::empty))
}
#[allow(clippy::unnecessary_wraps)]
fn op_worker_close(state: &mut OpState, _: (), _: ()) -> Result<(), AnyError> {
// Notify parent that we're finished
let mut handle = state.borrow_mut::<WebWorkerInternalHandle>().clone();
handle.terminate();
Ok(())
}
/// A worker that encounters an uncaught error will pass this error
/// to its parent worker using this op. The parent worker will use
/// this same op to pass the error to its own parent (in case
/// `e.preventDefault()` was not called in `worker.onerror`). This
/// is done until the error reaches the root/ main worker.
#[allow(clippy::unnecessary_wraps)]
fn op_worker_unhandled_error(
state: &mut OpState,
message: String,
_: (),
) -> Result<(), AnyError> {
let sender = state.borrow::<WebWorkerInternalHandle>().clone();
sender
.post_event(WorkerEvent::Error(generic_error(message)))
.expect("Failed to propagate error event to parent worker");
Ok(())
}

View file

@ -15,12 +15,11 @@ use crate::web_worker::run_web_worker;
use crate::web_worker::WebWorker;
use crate::web_worker::WebWorkerHandle;
use crate::web_worker::WorkerEvent;
use crate::web_worker::WorkerId;
use deno_core::error::custom_error;
use deno_core::error::generic_error;
use deno_core::error::null_opbuf;
use deno_core::error::AnyError;
use deno_core::error::JsError;
use deno_core::futures::channel::mpsc;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::serde::de;
@ -28,7 +27,6 @@ use deno_core::serde::de::SeqAccess;
use deno_core::serde::Deserialize;
use deno_core::serde::Deserializer;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
use deno_core::Extension;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
@ -46,7 +44,7 @@ use std::thread::JoinHandle;
pub struct CreateWebWorkerArgs {
pub name: String,
pub worker_id: u32,
pub worker_id: WorkerId,
pub parent_permissions: Permissions,
pub permissions: Permissions,
pub main_module: ModuleSpecifier,
@ -68,13 +66,9 @@ pub struct WorkerThread {
worker_handle: WebWorkerHandle,
}
pub type WorkersTable = HashMap<u32, WorkerThread>;
pub type WorkerId = u32;
pub type WorkersTable = HashMap<WorkerId, WorkerThread>;
pub fn init(
is_main_worker: bool,
create_web_worker_cb: Arc<CreateWebWorkerCb>,
) -> Extension {
pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension {
Extension::builder()
.state(move |state| {
state.put::<WorkersTable>(WorkersTable::default());
@ -94,20 +88,6 @@ pub fn init(
),
("op_host_post_message", op_sync(op_host_post_message)),
("op_host_get_message", op_async(op_host_get_message)),
(
"op_host_unhandled_error",
op_sync(move |state, message: String, _: ()| {
if is_main_worker {
return Err(generic_error("Cannot be called from main worker."));
}
let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
sender
.try_send(WorkerEvent::Error(generic_error(message)))
.expect("Failed to propagate error event to parent worker");
Ok(true)
}),
),
])
.build()
}
@ -473,7 +453,7 @@ fn op_create_worker(
let worker_id = state.take::<WorkerId>();
let create_module_loader = state.take::<CreateWebWorkerCbHolder>();
state.put::<CreateWebWorkerCbHolder>(create_module_loader.clone());
state.put::<WorkerId>(worker_id + 1);
state.put::<WorkerId>(worker_id.next().unwrap());
let module_specifier = deno_core::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_else(|| "".to_string());
@ -483,7 +463,7 @@ fn op_create_worker(
// Setup new thread
let thread_builder =
std::thread::Builder::new().name(format!("deno-worker-{}", worker_id));
std::thread::Builder::new().name(format!("{}", worker_id));
// Spawn it
let join_handle = thread_builder.spawn(move || {
@ -501,7 +481,7 @@ fn op_create_worker(
use_deno_namespace,
});
// Send thread safe handle to newly created worker to host thread
// Send thread safe handle from newly created worker to host thread
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
drop(handle_sender);
@ -512,6 +492,7 @@ fn op_create_worker(
run_web_worker(worker, module_specifier, maybe_source_code)
})?;
// Receive WebWorkerHandle from newly created worker
let worker_handle = handle_receiver.recv().unwrap()?;
let worker_thread = WorkerThread {
@ -534,7 +515,7 @@ fn op_host_terminate_worker(
id: WorkerId,
_: (),
) -> Result<(), AnyError> {
let worker_thread = state
let mut worker_thread = state
.borrow_mut::<WorkersTable>()
.remove(&id)
.expect("No worker handle found");
@ -547,54 +528,53 @@ fn op_host_terminate_worker(
Ok(())
}
fn serialize_worker_event(event: WorkerEvent) -> Value {
match event {
WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }),
WorkerEvent::TerminalError(error) => match error.downcast::<JsError>() {
Ok(js_error) => json!({
"type": "terminalError",
"error": {
use deno_core::serde::Serialize;
use deno_core::serde::Serializer;
impl Serialize for WorkerEvent {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let type_id = match &self {
WorkerEvent::Message(_) => 0_i32,
WorkerEvent::TerminalError(_) => 1_i32,
WorkerEvent::Error(_) => 2_i32,
WorkerEvent::Close => 3_i32,
};
match self {
WorkerEvent::Message(buf) => {
Serialize::serialize(&(type_id, buf), serializer)
}
WorkerEvent::TerminalError(error) | WorkerEvent::Error(error) => {
let value = match error.downcast_ref::<JsError>() {
Some(js_error) => json!({
"message": js_error.message,
"fileName": js_error.script_resource_name,
"lineNumber": js_error.line_number,
"columnNumber": js_error.start_column,
}
}),
Err(error) => json!({
"type": "terminalError",
"error": {
None => json!({
"message": error.to_string(),
}
}),
},
WorkerEvent::Error(error) => match error.downcast::<JsError>() {
Ok(js_error) => json!({
"type": "error",
"error": {
"message": js_error.message,
"fileName": js_error.script_resource_name,
"lineNumber": js_error.line_number,
"columnNumber": js_error.start_column,
};
Serialize::serialize(&(type_id, value), serializer)
}
}),
Err(error) => json!({
"type": "error",
"error": {
"message": error.to_string(),
_ => Serialize::serialize(&(type_id, ()), serializer),
}
}),
},
}
}
/// Try to remove worker from workers table - NOTE: `Worker.terminate()`
/// might have been called already meaning that we won't find worker in
/// table - in that case ignore.
fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: u32) {
fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) {
let mut s = state.borrow_mut();
let workers = s.borrow_mut::<WorkersTable>();
if let Some(mut worker_thread) = workers.remove(&id) {
worker_thread.worker_handle.sender.close_channel();
worker_thread.worker_handle.terminate();
worker_thread
.join_handle
.join()
@ -608,7 +588,7 @@ async fn op_host_get_message(
state: Rc<RefCell<OpState>>,
id: WorkerId,
_: (),
) -> Result<Value, AnyError> {
) -> Result<WorkerEvent, AnyError> {
let worker_handle = {
let s = state.borrow();
let workers_table = s.borrow::<WorkersTable>();
@ -617,7 +597,7 @@ async fn op_host_get_message(
handle.worker_handle.clone()
} else {
// If handle was not found it means worker has already shutdown
return Ok(json!({ "type": "close" }));
return Ok(WorkerEvent::Close);
}
};
@ -627,12 +607,12 @@ async fn op_host_get_message(
if let WorkerEvent::TerminalError(_) = &event {
try_remove_and_close(state, id);
}
return Ok(serialize_worker_event(event));
return Ok(event);
}
// If there was no event from worker it means it has already been closed.
try_remove_and_close(state, id);
Ok(json!({ "type": "close" }))
Ok(WorkerEvent::Close)
}
/// Post message to guest worker as host
@ -641,8 +621,7 @@ fn op_host_post_message(
id: WorkerId,
data: Option<ZeroCopyBuf>,
) -> Result<(), AnyError> {
let data = data.ok_or_else(null_opbuf)?;
let msg = Vec::from(&*data).into_boxed_slice();
let msg = data.ok_or_else(null_opbuf)?;
debug!("post message to worker {}", id);
let worker_thread = state

View file

@ -13,7 +13,8 @@ use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt;
use deno_core::futures::stream::StreamExt;
use deno_core::futures::task::AtomicWaker;
use deno_core::serde::Deserialize;
use deno_core::serde::Serialize;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::url::Url;
@ -22,12 +23,16 @@ use deno_core::Extension;
use deno_core::GetErrorClassFn;
use deno_core::JsErrorCreateFn;
use deno_core::JsRuntime;
use deno_core::ModuleId;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
use deno_core::ZeroCopyBuf;
use deno_file::BlobUrlStore;
use log::debug;
use std::cell::RefCell;
use std::env;
use std::fmt;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
@ -36,38 +41,98 @@ use std::task::Context;
use std::task::Poll;
use tokio::sync::Mutex as AsyncMutex;
#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize,
)]
pub struct WorkerId(u32);
impl fmt::Display for WorkerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "worker-{}", self.0)
}
}
impl WorkerId {
pub fn next(&self) -> Option<WorkerId> {
self.0.checked_add(1).map(WorkerId)
}
}
type WorkerMessage = ZeroCopyBuf;
/// Events that are sent to host from child
/// worker.
pub enum WorkerEvent {
Message(Box<[u8]>),
Message(WorkerMessage),
Error(AnyError),
TerminalError(AnyError),
Close,
}
pub struct WorkerChannelsInternal {
pub sender: mpsc::Sender<WorkerEvent>,
pub receiver: mpsc::Receiver<Box<[u8]>>,
// Channels used for communication with worker's parent
#[derive(Clone)]
pub struct WebWorkerInternalHandle {
sender: mpsc::Sender<WorkerEvent>,
receiver: Rc<RefCell<mpsc::Receiver<WorkerMessage>>>,
terminated: Arc<AtomicBool>,
isolate_handle: v8::IsolateHandle,
}
impl WebWorkerInternalHandle {
/// Post WorkerEvent to parent as a worker
pub fn post_event(&self, event: WorkerEvent) -> Result<(), AnyError> {
let mut sender = self.sender.clone();
// If the channel is closed,
// the worker must have terminated but the termination message has not yet been received.
//
// Therefore just treat it as if the worker has terminated and return.
if sender.is_closed() {
self.terminated.store(true, Ordering::SeqCst);
return Ok(());
}
sender.try_send(event)?;
Ok(())
}
/// Get the WorkerEvent with lock
/// Panic if more than one listener tries to get event
pub async fn get_message(&self) -> Option<WorkerMessage> {
let mut receiver = self.receiver.borrow_mut();
receiver.next().await
}
/// Check if this worker is terminated or being terminated
pub fn is_terminated(&self) -> bool {
self.terminated.load(Ordering::SeqCst)
}
/// Terminate the worker
/// This function will set terminated to true, terminate the isolate and close the message channel
pub fn terminate(&mut self) {
// This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so
// we need a guard here.
let already_terminated = self.terminated.swap(true, Ordering::SeqCst);
if !already_terminated {
// Stop javascript execution
self.isolate_handle.terminate_execution();
}
// Wake parent by closing the channel
self.sender.close_channel();
}
}
/// Wrapper for `WorkerHandle` that adds functionality
/// for terminating workers.
///
/// This struct is used by host as well as worker itself.
///
/// Host uses it to communicate with worker and terminate it,
/// while worker uses it only to finish execution on `self.close()`.
#[derive(Clone)]
pub struct WebWorkerHandle {
pub sender: mpsc::Sender<Box<[u8]>>,
pub receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>,
terminate_tx: mpsc::Sender<()>,
sender: mpsc::Sender<WorkerMessage>,
receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>,
terminated: Arc<AtomicBool>,
isolate_handle: v8::IsolateHandle,
}
impl WebWorkerHandle {
/// Post message to worker as a host.
pub fn post_message(&self, buf: Box<[u8]>) -> Result<(), AnyError> {
/// Post WorkerMessage to worker as a host
pub fn post_message(&self, buf: WorkerMessage) -> Result<(), AnyError> {
let mut sender = self.sender.clone();
// If the channel is closed,
// the worker must have terminated but the termination message has not yet been recieved.
@ -81,47 +146,50 @@ impl WebWorkerHandle {
Ok(())
}
/// Get the event with lock.
/// Get the WorkerEvent with lock
/// Return error if more than one listener tries to get event
pub async fn get_event(&self) -> Result<Option<WorkerEvent>, AnyError> {
let mut receiver = self.receiver.try_lock()?;
Ok(receiver.next().await)
}
pub fn terminate(&self) {
/// Terminate the worker
/// This function will set terminated to true, terminate the isolate and close the message channel
pub fn terminate(&mut self) {
// This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so
// we need a guard here.
let already_terminated = self.terminated.swap(true, Ordering::SeqCst);
if !already_terminated {
// Stop javascript execution
self.isolate_handle.terminate_execution();
let mut sender = self.terminate_tx.clone();
// This call should be infallible hence the `expect`.
// This might change in the future.
sender.try_send(()).expect("Failed to terminate");
}
// Wake web worker by closing the channel
self.sender.close_channel();
}
}
fn create_channels(
fn create_handles(
isolate_handle: v8::IsolateHandle,
terminate_tx: mpsc::Sender<()>,
) -> (WorkerChannelsInternal, WebWorkerHandle) {
let (in_tx, in_rx) = mpsc::channel::<Box<[u8]>>(1);
) -> (WebWorkerInternalHandle, WebWorkerHandle) {
let (in_tx, in_rx) = mpsc::channel::<WorkerMessage>(1);
let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1);
let internal_channels = WorkerChannelsInternal {
let terminated = Arc::new(AtomicBool::new(false));
let internal_handle = WebWorkerInternalHandle {
sender: out_tx,
receiver: in_rx,
receiver: Rc::new(RefCell::new(in_rx)),
terminated: terminated.clone(),
isolate_handle: isolate_handle.clone(),
};
let external_channels = WebWorkerHandle {
let external_handle = WebWorkerHandle {
sender: in_tx,
receiver: Arc::new(AsyncMutex::new(out_rx)),
terminated: Arc::new(AtomicBool::new(false)),
terminate_tx,
terminated,
isolate_handle,
};
(internal_channels, external_channels)
(internal_handle, external_handle)
}
/// This struct is an implementation of `Worker` Web API
@ -129,17 +197,12 @@ fn create_channels(
/// Each `WebWorker` is either a child of `MainWorker` or other
/// `WebWorker`.
pub struct WebWorker {
id: u32,
id: WorkerId,
inspector: Option<Box<DenoInspector>>,
// Following fields are pub because they are accessed
// when creating a new WebWorker instance.
pub(crate) internal_channels: WorkerChannelsInternal,
pub js_runtime: JsRuntime,
pub name: String,
waker: AtomicWaker,
event_loop_idle: bool,
terminate_rx: mpsc::Receiver<()>,
handle: WebWorkerHandle,
internal_handle: WebWorkerInternalHandle,
external_handle: WebWorkerHandle,
pub use_deno_namespace: bool,
pub main_module: ModuleSpecifier,
}
@ -174,7 +237,7 @@ impl WebWorker {
name: String,
permissions: Permissions,
main_module: ModuleSpecifier,
worker_id: u32,
worker_id: WorkerId,
options: &WebWorkerOptions,
) -> Self {
// Permissions: many ops depend on this
@ -218,7 +281,7 @@ impl WebWorker {
let runtime_exts = vec![
ops::web_worker::init(),
ops::runtime::init(main_module.clone()),
ops::worker_host::init(false, options.create_web_worker_cb.clone()),
ops::worker_host::init(options.create_web_worker_cb.clone()),
ops::io::init(),
];
@ -264,38 +327,24 @@ impl WebWorker {
None
};
let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1);
let isolate_handle = js_runtime.v8_isolate().thread_safe_handle();
let (internal_channels, handle) =
create_channels(isolate_handle, terminate_tx);
let mut worker = Self {
id: worker_id,
inspector,
internal_channels,
js_runtime,
name,
waker: AtomicWaker::new(),
event_loop_idle: false,
terminate_rx,
handle,
use_deno_namespace: options.use_deno_namespace,
main_module,
};
// Setup worker-dependant OpState and return worker
{
let handle = worker.thread_safe_handle();
let sender = worker.internal_channels.sender.clone();
let js_runtime = &mut worker.js_runtime;
let (internal_handle, external_handle) = {
let handle = js_runtime.v8_isolate().thread_safe_handle();
let (internal_handle, external_handle) = create_handles(handle);
let op_state = js_runtime.op_state();
let mut op_state = op_state.borrow_mut();
op_state.put(internal_handle.clone());
(internal_handle, external_handle)
};
// Required by runtime::ops::worker_host/web_worker
op_state.put(handle);
op_state.put(sender);
worker
Self {
id: worker_id,
inspector,
js_runtime,
name,
internal_handle,
external_handle,
use_deno_namespace: options.use_deno_namespace,
main_module,
}
}
@ -321,7 +370,7 @@ impl WebWorker {
// Instead of using name for log we use `worker-${id}` because
// WebWorkers can have empty string as name.
let script = format!(
"bootstrap.workerRuntime({}, \"{}\", {}, \"worker-{}\")",
"bootstrap.workerRuntime({}, \"{}\", {}, \"{}\")",
runtime_options_str, self.name, options.use_deno_namespace, self.id
);
self
@ -338,12 +387,20 @@ impl WebWorker {
self.js_runtime.execute(url.as_str(), js_source)
}
/// Loads and instantiates specified JavaScript module.
pub async fn preload_module(
&mut self,
module_specifier: &ModuleSpecifier,
) -> Result<ModuleId, AnyError> {
self.js_runtime.load_module(module_specifier, None).await
}
/// Loads, instantiates and executes specified JavaScript module.
pub async fn execute_module(
&mut self,
module_specifier: &ModuleSpecifier,
) -> Result<(), AnyError> {
let id = self.js_runtime.load_module(module_specifier, None).await?;
let id = self.preload_module(module_specifier).await?;
let mut receiver = self.js_runtime.mod_evaluate(id);
tokio::select! {
@ -357,7 +414,7 @@ impl WebWorker {
}
event_loop_result = self.run_event_loop() => {
if self.has_been_terminated() {
if self.internal_handle.is_terminated() {
return Ok(());
}
event_loop_result?;
@ -370,82 +427,44 @@ impl WebWorker {
/// Returns a way to communicate with the Worker from other threads.
pub fn thread_safe_handle(&self) -> WebWorkerHandle {
self.handle.clone()
}
pub fn has_been_terminated(&self) -> bool {
self.handle.terminated.load(Ordering::SeqCst)
self.external_handle.clone()
}
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
if self.has_been_terminated() {
// If awakened because we are terminating, just return Ok
if self.internal_handle.is_terminated() {
return Poll::Ready(Ok(()));
}
if !self.event_loop_idle {
let poll_result = {
// We always poll the inspector if it exists.
let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx));
self.waker.register(cx.waker());
self.js_runtime.poll_event_loop(cx)
};
if let Poll::Ready(r) = poll_result {
if self.has_been_terminated() {
match self.js_runtime.poll_event_loop(cx) {
Poll::Ready(r) => {
// If js ended because we are terminating, just return Ok
if self.internal_handle.is_terminated() {
return Poll::Ready(Ok(()));
}
// In case of an error, pass to parent without terminating worker
if let Err(e) = r {
print_worker_error(e.to_string(), &self.name);
let mut sender = self.internal_channels.sender.clone();
sender
.try_send(WorkerEvent::Error(e))
let handle = self.internal_handle.clone();
handle
.post_event(WorkerEvent::Error(e))
.expect("Failed to post message to host");
}
self.event_loop_idle = true;
}
return Poll::Pending;
}
if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) {
// terminate_rx should never be closed
assert!(r.is_some());
return Poll::Ready(Ok(()));
panic!(
"coding error: either js is polling or the worker is terminated"
);
}
let maybe_msg_poll_result =
self.internal_channels.receiver.poll_next_unpin(cx);
if let Poll::Ready(maybe_msg) = maybe_msg_poll_result {
let msg =
maybe_msg.expect("Received `None` instead of message in worker");
let msg = String::from_utf8(msg.to_vec()).unwrap();
let script = format!("workerMessageRecvCallback({})", msg);
// TODO(bartlomieju): set proper script name like "deno:runtime/web_worker.js"
// so it's dimmed in stack trace instead of using "__anonymous__"
if let Err(e) = self.execute(&script) {
// If execution was terminated during message callback then
// just ignore it
if self.has_been_terminated() {
return Poll::Ready(Ok(()));
Poll::Pending => Poll::Pending,
}
// Otherwise forward error to host
let mut sender = self.internal_channels.sender.clone();
sender
.try_send(WorkerEvent::Error(e))
.expect("Failed to post message to host");
}
// Let event loop be polled again
self.event_loop_idle = false;
self.waker.wake();
}
Poll::Pending
}
pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
@ -495,18 +514,18 @@ pub fn run_web_worker(
rt.block_on(load_future)
};
let mut sender = worker.internal_channels.sender.clone();
let internal_handle = worker.internal_handle.clone();
// If sender is closed it means that worker has already been closed from
// within using "globalThis.close()"
if sender.is_closed() {
if internal_handle.is_terminated() {
return Ok(());
}
if let Err(e) = result {
print_worker_error(e.to_string(), &name);
sender
.try_send(WorkerEvent::TerminalError(e))
internal_handle
.post_event(WorkerEvent::TerminalError(e))
.expect("Failed to post message to host");
// Failure to execute script is a terminal error, bye, bye.
@ -522,7 +541,6 @@ pub fn run_web_worker(
mod tests {
use super::*;
use crate::tokio_util;
use deno_core::serde_json::json;
fn create_test_web_worker() -> WebWorker {
let main_module = deno_core::resolve_url_or_path("./hello.js").unwrap();
@ -554,7 +572,7 @@ mod tests {
"TEST".to_string(),
Permissions::allow_all(),
main_module,
1,
WorkerId(1),
&options,
);
worker.bootstrap(&options);
@ -589,30 +607,30 @@ mod tests {
let mut handle = handle_receiver.recv().unwrap();
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = handle.post_message(msg.clone());
// TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded
let r = handle.post_message(msg.clone().into());
assert!(r.is_ok());
let maybe_msg = handle.get_event().await.unwrap();
assert!(maybe_msg.is_some());
let r = handle.post_message(msg.clone());
let r = handle.post_message(msg.clone().into());
assert!(r.is_ok());
let maybe_msg = handle.get_event().await.unwrap();
assert!(maybe_msg.is_some());
match maybe_msg {
Some(WorkerEvent::Message(buf)) => {
assert_eq!(*buf, *b"[1,2,3]");
// TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
assert_eq!(*buf, [65, 3, 73, 2, 73, 4, 73, 6, 36, 0, 3]);
}
_ => unreachable!(),
}
let msg = json!("exit")
.to_string()
.into_boxed_str()
.into_boxed_bytes();
let r = handle.post_message(msg);
// TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
let msg = vec![34, 4, 101, 120, 105, 116].into_boxed_slice(); // "exit" encoded
let r = handle.post_message(msg.into());
assert!(r.is_ok());
let event = handle.get_event().await.unwrap();
assert!(event.is_none());
@ -636,8 +654,9 @@ mod tests {
let mut handle = handle_receiver.recv().unwrap();
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = handle.post_message(msg.clone());
// TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded
let r = handle.post_message(msg.clone().into());
assert!(r.is_ok());
let event = handle.get_event().await.unwrap();
assert!(event.is_none());

View file

@ -113,7 +113,7 @@ impl MainWorker {
metrics::init(),
// Runtime ops
ops::runtime::init(main_module),
ops::worker_host::init(true, options.create_web_worker_cb.clone()),
ops::worker_host::init(options.create_web_worker_cb.clone()),
ops::fs_events::init(),
ops::fs::init(),
ops::http::init(),

View file

@ -1,9 +1,9 @@
use rusty_v8 as v8;
use std::cell::Cell;
use std::fmt;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Mutex;
use super::zero_copy_buf::ZeroCopyBuf;
@ -11,7 +11,7 @@ use super::zero_copy_buf::ZeroCopyBuf;
// allowing us to use a single type for familiarity
pub enum MagicBuffer {
FromV8(ZeroCopyBuf),
ToV8(Cell<Option<Box<[u8]>>>),
ToV8(Mutex<Option<Box<[u8]>>>),
}
impl MagicBuffer {
@ -21,6 +21,10 @@ impl MagicBuffer {
) -> Self {
Self::FromV8(ZeroCopyBuf::new(scope, view))
}
pub fn empty() -> Self {
MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice())))
}
}
impl Clone for MagicBuffer {
@ -65,7 +69,7 @@ impl DerefMut for MagicBuffer {
impl From<Box<[u8]>> for MagicBuffer {
fn from(buf: Box<[u8]>) -> Self {
MagicBuffer::ToV8(Cell::new(Some(buf)))
MagicBuffer::ToV8(Mutex::new(Some(buf)))
}
}
@ -88,8 +92,11 @@ impl serde::Serialize for MagicBuffer {
let mut s = serializer.serialize_struct(BUF_NAME, 1)?;
let boxed: Box<[u8]> = match self {
Self::FromV8(_) => unreachable!(),
Self::ToV8(x) => x.take().expect("MagicBuffer was empty"),
Self::FromV8(buf) => {
let value: &[u8] = &buf;
value.into()
}
Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"),
};
let hack: [usize; 2] = unsafe { std::mem::transmute(boxed) };
let f1: u64 = hack[0] as u64;