From be71885628c3820cc4e62d229326de16a6830fec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Fri, 10 Apr 2020 00:15:17 +0200 Subject: [PATCH] implement Worker.terminate() and self.close() (#4684) --- cli/compilers/ts.rs | 11 +- cli/compilers/wasm.rs | 11 +- cli/js/web/workers.ts | 8 ++ cli/ops/web_worker.rs | 44 ++++++- cli/ops/worker_host.rs | 67 ++++++++--- cli/state.rs | 4 +- cli/tests/subdir/busy_worker.js | 8 ++ cli/tests/subdir/racy_worker.js | 21 ++++ cli/tests/workers_test.out | 14 ++- cli/tests/workers_test.ts | 77 +++++++++++-- cli/web_worker.rs | 195 +++++++++++++++++++++++++------- cli/worker.rs | 14 +-- 12 files changed, 371 insertions(+), 103 deletions(-) create mode 100644 cli/tests/subdir/busy_worker.js create mode 100644 cli/tests/subdir/racy_worker.js diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index 641f16bca3..c78d7ef575 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -16,8 +16,8 @@ use crate::startup_data; use crate::state::*; use crate::tokio_util; use crate::version; +use crate::web_worker::WebWorkerHandle; use crate::worker::WorkerEvent; -use crate::worker::WorkerHandle; use deno_core::Buf; use deno_core::ErrBox; use deno_core::ModuleSpecifier; @@ -609,7 +609,7 @@ async fn execute_in_thread( req: Buf, ) -> Result { let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::>(1); + std::sync::mpsc::sync_channel::>(1); let builder = std::thread::Builder::new().name("deno-ts-compiler".to_string()); let join_handle = builder.spawn(move || { @@ -618,15 +618,16 @@ async fn execute_in_thread( drop(handle_sender); tokio_util::run_basic(worker).expect("Panic in event loop"); })?; - let mut handle = handle_receiver.recv().unwrap()?; - handle.post_message(req).await?; + let handle = handle_receiver.recv().unwrap()?; + handle.post_message(req)?; let event = handle.get_event().await.expect("Compiler didn't respond"); let buf = match event { WorkerEvent::Message(buf) => Ok(buf), WorkerEvent::Error(error) => Err(error), + WorkerEvent::TerminalError(error) => Err(error), }?; // Shutdown worker and wait for thread to finish - handle.sender.close_channel(); + handle.terminate(); join_handle.join().unwrap(); Ok(buf) } diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index d81138482b..4e8ba78f32 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -6,8 +6,8 @@ use crate::global_state::GlobalState; use crate::startup_data; use crate::state::*; use crate::tokio_util; +use crate::web_worker::WebWorkerHandle; use crate::worker::WorkerEvent; -use crate::worker::WorkerHandle; use deno_core::Buf; use deno_core::ErrBox; use deno_core::ModuleSpecifier; @@ -118,7 +118,7 @@ async fn execute_in_thread( req: Buf, ) -> Result { let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::>(1); + std::sync::mpsc::sync_channel::>(1); let builder = std::thread::Builder::new().name("deno-wasm-compiler".to_string()); let join_handle = builder.spawn(move || { @@ -127,15 +127,16 @@ async fn execute_in_thread( drop(handle_sender); tokio_util::run_basic(worker).expect("Panic in event loop"); })?; - let mut handle = handle_receiver.recv().unwrap()?; - handle.post_message(req).await?; + let handle = handle_receiver.recv().unwrap()?; + handle.post_message(req)?; let event = handle.get_event().await.expect("Compiler didn't respond"); let buf = match event { WorkerEvent::Message(buf) => Ok(buf), WorkerEvent::Error(error) => Err(error), + WorkerEvent::TerminalError(error) => Err(error), }?; // Shutdown worker and wait for thread to finish - handle.sender.close_channel(); + handle.terminate(); join_handle.join().unwrap(); Ok(buf) } diff --git a/cli/js/web/workers.ts b/cli/js/web/workers.ts index a62ca0b778..7a0abbbdb9 100644 --- a/cli/js/web/workers.ts +++ b/cli/js/web/workers.ts @@ -127,6 +127,14 @@ export class WorkerImpl extends EventTarget implements Worker { const type = event.type; + if (type === "terminalError") { + this.#terminated = true; + if (!this.#handleError(event.error)) { + throw Error(event.error.message); + } + continue; + } + if (type === "msg") { if (this.onmessage) { const message = decodeMessage(new Uint8Array(event.data)); diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs index dd7b6e34a1..8cade7d402 100644 --- a/cli/ops/web_worker.rs +++ b/cli/ops/web_worker.rs @@ -3,10 +3,10 @@ use super::dispatch_json::{JsonOp, Value}; use crate::op_error::OpError; use crate::ops::json_op; use crate::state::State; +use crate::web_worker::WebWorkerHandle; use crate::worker::WorkerEvent; use deno_core::*; use futures::channel::mpsc; -use futures::sink::SinkExt; use std::convert::From; pub fn web_worker_op( @@ -25,7 +25,32 @@ where -> Result { dispatcher(&sender, args, zero_copy) } } -pub fn init(i: &mut Isolate, s: &State, sender: &mpsc::Sender) { +pub fn web_worker_op2( + handle: WebWorkerHandle, + sender: mpsc::Sender, + dispatcher: D, +) -> impl Fn(Value, Option) -> Result +where + D: Fn( + WebWorkerHandle, + &mpsc::Sender, + Value, + Option, + ) -> Result, +{ + move |args: Value, + zero_copy: Option| + -> Result { + dispatcher(handle.clone(), &sender, args, zero_copy) + } +} + +pub fn init( + i: &mut Isolate, + s: &State, + sender: &mpsc::Sender, + handle: WebWorkerHandle, +) { i.register_op( "op_worker_post_message", s.core_op(json_op(web_worker_op( @@ -35,7 +60,11 @@ pub fn init(i: &mut Isolate, s: &State, sender: &mpsc::Sender) { ); i.register_op( "op_worker_close", - s.core_op(json_op(web_worker_op(sender.clone(), op_worker_close))), + s.core_op(json_op(web_worker_op2( + handle, + sender.clone(), + op_worker_close, + ))), ); } @@ -47,18 +76,23 @@ fn op_worker_post_message( ) -> Result { let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let mut sender = sender.clone(); - let fut = sender.send(WorkerEvent::Message(d)); - futures::executor::block_on(fut).expect("Failed to post message to host"); + sender + .try_send(WorkerEvent::Message(d)) + .expect("Failed to post message to host"); Ok(JsonOp::Sync(json!({}))) } /// Notify host that guest worker closes fn op_worker_close( + handle: WebWorkerHandle, sender: &mpsc::Sender, _args: Value, _data: Option, ) -> Result { let mut sender = sender.clone(); + // Notify parent that we're finished sender.close_channel(); + // Terminate execution of current worker + handle.terminate(); Ok(JsonOp::Sync(json!({}))) } diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index ee448eb834..a042500995 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -1,7 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::fmt_errors::JSError; -use crate::futures::SinkExt; use crate::global_state::GlobalState; use crate::op_error::OpError; use crate::permissions::DenoPermissions; @@ -9,11 +8,10 @@ use crate::startup_data; use crate::state::State; use crate::tokio_util::create_basic_runtime; use crate::web_worker::WebWorker; +use crate::web_worker::WebWorkerHandle; use crate::worker::WorkerEvent; -use crate::worker::WorkerHandle; use deno_core::*; use futures::future::FutureExt; -use futures::future::TryFutureExt; use std::convert::From; use std::thread::JoinHandle; @@ -58,9 +56,9 @@ fn run_worker_thread( specifier: ModuleSpecifier, has_source_code: bool, source_code: String, -) -> Result<(JoinHandle<()>, WorkerHandle), ErrBox> { +) -> Result<(JoinHandle<()>, WebWorkerHandle), ErrBox> { let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::>(1); + std::sync::mpsc::sync_channel::>(1); let builder = std::thread::Builder::new().name(format!("deno-worker-{}", name)); @@ -78,6 +76,7 @@ fn run_worker_thread( } let mut worker = result.unwrap(); + let name = worker.name.to_string(); // Send thread safe handle to newly created worker to host thread handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); drop(handle_sender); @@ -109,7 +108,8 @@ fn run_worker_thread( if let Err(e) = result { let mut sender = worker.internal_channels.sender.clone(); - futures::executor::block_on(sender.send(WorkerEvent::Error(e))) + sender + .try_send(WorkerEvent::TerminalError(e)) .expect("Failed to post message to host"); // Failure to execute script is a terminal error, bye, bye. @@ -120,6 +120,7 @@ fn run_worker_thread( // that means that we should store JoinHandle to thread to ensure // that it actually terminates. rt.block_on(worker).expect("Panic in event loop"); + debug!("Worker thread shuts down {}", &name); })?; let worker_handle = handle_receiver.recv().unwrap()?; @@ -205,6 +206,28 @@ fn op_host_terminate_worker( fn serialize_worker_event(event: WorkerEvent) -> Value { match event { WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }), + WorkerEvent::TerminalError(error) => { + let mut serialized_error = json!({ + "type": "terminalError", + "error": { + "message": error.to_string(), + } + }); + + if let Ok(js_error) = error.downcast::() { + serialized_error = json!({ + "type": "terminalError", + "error": { + "message": js_error.message, + "fileName": js_error.script_resource_name, + "lineNumber": js_error.line_number, + "columnNumber": js_error.start_column, + } + }); + } + + serialized_error + } WorkerEvent::Error(error) => { let mut serialized_error = json!({ "type": "error", @@ -247,13 +270,30 @@ fn op_host_get_message( let state_ = state.clone(); let op = async move { let response = match worker_handle.get_event().await { - Some(event) => serialize_worker_event(event), + Some(event) => { + // Terminal error means that worker should be removed from worker table. + if let WorkerEvent::TerminalError(_) = &event { + let mut state_ = state_.borrow_mut(); + if let Some((join_handle, mut worker_handle)) = + state_.workers.remove(&id) + { + worker_handle.sender.close_channel(); + join_handle.join().expect("Worker thread panicked"); + } + } + serialize_worker_event(event) + } None => { + // Worker shuts down let mut state_ = state_.borrow_mut(); - let (join_handle, mut worker_handle) = - state_.workers.remove(&id).expect("No worker handle found"); - worker_handle.sender.close_channel(); - join_handle.join().expect("Worker thread panicked"); + // Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called + // already meaning that we won't find worker in table - in that case ignore. + if let Some((join_handle, mut worker_handle)) = + state_.workers.remove(&id) + { + worker_handle.sender.close_channel(); + join_handle.join().expect("Worker thread panicked"); + } json!({ "type": "close" }) } }; @@ -276,9 +316,8 @@ fn op_host_post_message( let state = state.borrow(); let (_, worker_handle) = state.workers.get(&id).expect("No worker handle found"); - let fut = worker_handle + worker_handle .post_message(msg) - .map_err(|e| OpError::other(e.to_string())); - futures::executor::block_on(fut)?; + .map_err(|e| OpError::other(e.to_string()))?; Ok(JsonOp::Sync(json!({}))) } diff --git a/cli/state.rs b/cli/state.rs index d55e4e5ebb..228ef1200c 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -8,7 +8,7 @@ use crate::op_error::OpError; use crate::ops::JsonOp; use crate::ops::MinimalOp; use crate::permissions::DenoPermissions; -use crate::worker::WorkerHandle; +use crate::web_worker::WebWorkerHandle; use deno_core::Buf; use deno_core::CoreOp; use deno_core::ErrBox; @@ -62,7 +62,7 @@ pub struct StateInner { pub import_map: Option, pub metrics: Metrics, pub global_timer: GlobalTimer, - pub workers: HashMap, WorkerHandle)>, + pub workers: HashMap, WebWorkerHandle)>, pub next_worker_id: u32, pub start_time: Instant, pub seeded_rng: Option, diff --git a/cli/tests/subdir/busy_worker.js b/cli/tests/subdir/busy_worker.js new file mode 100644 index 0000000000..7deba03219 --- /dev/null +++ b/cli/tests/subdir/busy_worker.js @@ -0,0 +1,8 @@ +self.onmessage = function (_evt) { + // infinite loop + for (let i = 0; true; i++) { + if (i % 1000 == 0) { + postMessage(i); + } + } +}; diff --git a/cli/tests/subdir/racy_worker.js b/cli/tests/subdir/racy_worker.js new file mode 100644 index 0000000000..83756b791d --- /dev/null +++ b/cli/tests/subdir/racy_worker.js @@ -0,0 +1,21 @@ +// See issue for details +// https://github.com/denoland/deno/issues/4080 +// +// After first call to `postMessage() this worker schedules +// [close(), postMessage()] ops on the same turn of microtask queue +// (because message is rather big). +// Only single `postMessage()` call should make it +// to host, ie. after calling `close()` no more code should be run. + +setTimeout(() => { + close(); +}, 50); + +while (true) { + await new Promise((done) => { + setTimeout(() => { + postMessage({ buf: new Array(999999) }); + done(); + }); + }); +} diff --git a/cli/tests/workers_test.out b/cli/tests/workers_test.out index efc0ce57c9..ca75009174 100644 --- a/cli/tests/workers_test.out +++ b/cli/tests/workers_test.out @@ -1,7 +1,9 @@ -running 4 tests -test workersBasic ... ok [WILDCARD] -test nestedWorker ... ok [WILDCARD] -test workerThrowsWhenExecuting ... ok [WILDCARD] -test workerCanUseFetch ... ok [WILDCARD] +running 6 tests +test worker terminate ... ok [WILDCARD] +test worker nested ... ok [WILDCARD] +test worker throws when executing ... ok [WILDCARD] +test worker fetch API ... ok [WILDCARD] +test worker terminate busy loop ... ok [WILDCARD] +test worker race condition ... ok [WILDCARD] -test result: ok. 4 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out [WILDCARD] +test result: ok. 6 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out [WILDCARD] diff --git a/cli/tests/workers_test.ts b/cli/tests/workers_test.ts index 42bd96d969..2c56a491cc 100644 --- a/cli/tests/workers_test.ts +++ b/cli/tests/workers_test.ts @@ -28,11 +28,10 @@ export function createResolvable(): Resolvable { } Deno.test({ - name: "workersBasic", - // FIXME(bartlomieju): - disableOpSanitizer: true, + name: "worker terminate", fn: async function (): Promise { const promise = createResolvable(); + const jsWorker = new Worker("../tests/subdir/test_worker.js", { type: "module", name: "jsWorker", @@ -59,13 +58,13 @@ Deno.test({ jsWorker.postMessage("Hello World"); await promise; + tsWorker.terminate(); + jsWorker.terminate(); }, }); Deno.test({ - name: "nestedWorker", - // FIXME(bartlomieju): - disableOpSanitizer: true, + name: "worker nested", fn: async function (): Promise { const promise = createResolvable(); @@ -81,13 +80,12 @@ Deno.test({ nestedWorker.postMessage("Hello World"); await promise; + nestedWorker.terminate(); }, }); Deno.test({ - name: "workerThrowsWhenExecuting", - // FIXME(bartlomieju): - disableOpSanitizer: true, + name: "worker throws when executing", fn: async function (): Promise { const promise = createResolvable(); const throwingWorker = new Worker("../tests/subdir/throwing_worker.js", { @@ -102,13 +100,12 @@ Deno.test({ }; await promise; + throwingWorker.terminate(); }, }); Deno.test({ - name: "workerCanUseFetch", - // FIXME(bartlomieju): - disableOpSanitizer: true, + name: "worker fetch API", fn: async function (): Promise { const promise = createResolvable(); @@ -128,6 +125,62 @@ Deno.test({ promise.resolve(); }; + await promise; + fetchingWorker.terminate(); + }, +}); + +Deno.test({ + name: "worker terminate busy loop", + fn: async function (): Promise { + const promise = createResolvable(); + + const busyWorker = new Worker("../tests/subdir/busy_worker.js", { + type: "module", + }); + + let testResult = 0; + + busyWorker.onmessage = (e): void => { + testResult = e.data; + if (testResult >= 10000) { + busyWorker.terminate(); + busyWorker.onmessage = (_e): void => { + throw new Error("unreachable"); + }; + setTimeout(() => { + assertEquals(testResult, 10000); + promise.resolve(); + }, 100); + } + }; + + busyWorker.postMessage("ping"); + await promise; + }, +}); + +Deno.test({ + name: "worker race condition", + fn: async function (): Promise { + // See issue for details + // https://github.com/denoland/deno/issues/4080 + const promise = createResolvable(); + + const racyWorker = new Worker("../tests/subdir/racy_worker.js", { + type: "module", + }); + + racyWorker.onmessage = (e): void => { + assertEquals(e.data.buf.length, 999999); + racyWorker.onmessage = (_e): void => { + throw new Error("unreachable"); + }; + setTimeout(() => { + promise.resolve(); + }, 100); + }; + await promise; }, }); diff --git a/cli/web_worker.rs b/cli/web_worker.rs index 683ba9ef49..795409175d 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -3,17 +3,68 @@ use crate::ops; use crate::state::State; use crate::worker::Worker; use crate::worker::WorkerEvent; +use crate::worker::WorkerHandle; +use deno_core::v8; use deno_core::ErrBox; use deno_core::StartupData; +use futures::channel::mpsc; use futures::future::FutureExt; use futures::stream::StreamExt; -use futures::SinkExt; use std::future::Future; use std::ops::Deref; use std::ops::DerefMut; use std::pin::Pin; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; use std::task::Context; use std::task::Poll; + +/// Wrapper for `WorkerHandle` that adds functionality +/// for terminating workers. +/// +/// This struct is used by host as well as worker itself. +/// +/// Host uses it to communicate with worker and terminate it, +/// while worker uses it only to finish execution on `self.close()`. +#[derive(Clone)] +pub struct WebWorkerHandle { + worker_handle: WorkerHandle, + terminate_tx: mpsc::Sender<()>, + terminated: Arc, + isolate_handle: v8::IsolateHandle, +} + +impl Deref for WebWorkerHandle { + type Target = WorkerHandle; + fn deref(&self) -> &Self::Target { + &self.worker_handle + } +} + +impl DerefMut for WebWorkerHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.worker_handle + } +} + +impl WebWorkerHandle { + pub fn terminate(&self) { + // This function can be called multiple times by whomever holds + // the handle. However only a single "termination" should occur so + // we need a guard here. + let already_terminated = self.terminated.swap(true, Ordering::Relaxed); + + if !already_terminated { + self.isolate_handle.terminate_execution(); + let mut sender = self.terminate_tx.clone(); + // This call should be infallible hence the `expect`. + // This might change in the future. + sender.try_send(()).expect("Failed to terminate"); + } + } +} + /// This worker is implementation of `Worker` Web API /// /// At the moment this type of worker supports only @@ -23,17 +74,50 @@ use std::task::Poll; /// `WebWorker`. pub struct WebWorker { worker: Worker, - is_ready: bool, + event_loop_idle: bool, + terminate_rx: mpsc::Receiver<()>, + handle: WebWorkerHandle, } impl WebWorker { pub fn new(name: String, startup_data: StartupData, state: State) -> Self { let state_ = state.clone(); let mut worker = Worker::new(name, startup_data, state_); + + let terminated = Arc::new(AtomicBool::new(false)); + let isolate_handle = worker + .isolate + .v8_isolate + .as_mut() + .unwrap() + .thread_safe_handle(); + let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1); + + let handle = WebWorkerHandle { + worker_handle: worker.thread_safe_handle(), + terminated, + isolate_handle, + terminate_tx, + }; + + let mut web_worker = Self { + worker, + event_loop_idle: false, + terminate_rx, + handle, + }; + + let handle = web_worker.thread_safe_handle(); + { - let isolate = &mut worker.isolate; + let isolate = &mut web_worker.worker.isolate; ops::runtime::init(isolate, &state); - ops::web_worker::init(isolate, &state, &worker.internal_channels.sender); + ops::web_worker::init( + isolate, + &state, + &web_worker.worker.internal_channels.sender, + handle, + ); ops::worker_host::init(isolate, &state); ops::io::init(isolate, &state); ops::resources::init(isolate, &state); @@ -42,10 +126,14 @@ impl WebWorker { ops::fetch::init(isolate, &state); } - Self { - worker, - is_ready: false, - } + web_worker + } +} + +impl WebWorker { + /// Returns a way to communicate with the Worker from other threads. + pub fn thread_safe_handle(&self) -> WebWorkerHandle { + self.handle.clone() } } @@ -69,47 +157,67 @@ impl Future for WebWorker { let inner = self.get_mut(); let worker = &mut inner.worker; - if !inner.is_ready { + let terminated = inner.handle.terminated.load(Ordering::Relaxed); + + if terminated { + return Poll::Ready(Ok(())); + } + + if !inner.event_loop_idle { match worker.poll_unpin(cx) { Poll::Ready(r) => { + let terminated = inner.handle.terminated.load(Ordering::Relaxed); + if terminated { + return Poll::Ready(Ok(())); + } + if let Err(e) = r { let mut sender = worker.internal_channels.sender.clone(); - futures::executor::block_on(sender.send(WorkerEvent::Error(e))) + sender + .try_send(WorkerEvent::Error(e)) .expect("Failed to post message to host"); } - inner.is_ready = true; + inner.event_loop_idle = true; } Poll::Pending => {} } } - let maybe_msg = { - match worker.internal_channels.receiver.poll_next_unpin(cx) { - Poll::Ready(r) => match r { - Some(msg) => { - let msg_str = String::from_utf8(msg.to_vec()).unwrap(); - debug!("received message from host: {}", msg_str); - Some(msg_str) - } - None => { - debug!("channel closed by host, worker event loop shuts down"); - return Poll::Ready(Ok(())); - } - }, - Poll::Pending => None, - } - }; + if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) { + // terminate_rx should never be closed + assert!(r.is_some()); + return Poll::Ready(Ok(())); + } - if let Some(msg) = maybe_msg { - // TODO: just add second value and then bind using rusty_v8 - // to get structured clone/transfer working - let script = format!("workerMessageRecvCallback({})", msg); - worker - .execute(&script) - .expect("Failed to execute message cb"); - // Let worker be polled again - inner.is_ready = false; - worker.waker.wake(); + if let Poll::Ready(r) = + worker.internal_channels.receiver.poll_next_unpin(cx) + { + match r { + Some(msg) => { + let msg = String::from_utf8(msg.to_vec()).unwrap(); + debug!("received message from host: {}", msg); + let script = format!("workerMessageRecvCallback({})", msg); + + if let Err(e) = worker.execute(&script) { + // If execution was terminated during message callback then + // just ignore it + if inner.handle.terminated.load(Ordering::Relaxed) { + return Poll::Ready(Ok(())); + } + + // Otherwise forward error to host + let mut sender = worker.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); + } + + // Let event loop be polled again + inner.event_loop_idle = false; + worker.waker.wake(); + } + None => unreachable!(), + } } Poll::Pending @@ -123,7 +231,6 @@ mod tests { use crate::state::State; use crate::tokio_util; use crate::worker::WorkerEvent; - use crate::worker::WorkerHandle; fn create_test_worker() -> WebWorker { let state = State::mock("./hello.js"); @@ -138,7 +245,7 @@ mod tests { #[test] fn test_worker_messages() { let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::(1); + std::sync::mpsc::sync_channel::(1); let join_handle = std::thread::spawn(move || { let mut worker = create_test_worker(); @@ -165,13 +272,13 @@ mod tests { tokio_util::run_basic(async move { let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()).await; + let r = handle.post_message(msg.clone()); assert!(r.is_ok()); let maybe_msg = handle.get_event().await; assert!(maybe_msg.is_some()); - let r = handle.post_message(msg.clone()).await; + let r = handle.post_message(msg.clone()); assert!(r.is_ok()); let maybe_msg = handle.get_event().await; @@ -187,7 +294,7 @@ mod tests { .to_string() .into_boxed_str() .into_boxed_bytes(); - let r = handle.post_message(msg).await; + let r = handle.post_message(msg); assert!(r.is_ok()); let event = handle.get_event().await; assert!(event.is_none()); @@ -199,7 +306,7 @@ mod tests { #[test] fn removed_from_resource_table_on_close() { let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::(1); + std::sync::mpsc::sync_channel::(1); let join_handle = std::thread::spawn(move || { let mut worker = create_test_worker(); @@ -214,7 +321,7 @@ mod tests { tokio_util::run_basic(async move { let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()).await; + let r = handle.post_message(msg.clone()); assert!(r.is_ok()); let event = handle.get_event().await; assert!(event.is_none()); diff --git a/cli/worker.rs b/cli/worker.rs index 9e1a646d51..c2a1ecc576 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -11,8 +11,6 @@ use deno_core::ModuleSpecifier; use deno_core::StartupData; use futures::channel::mpsc; use futures::future::FutureExt; -use futures::future::TryFutureExt; -use futures::sink::SinkExt; use futures::stream::StreamExt; use futures::task::AtomicWaker; use std::env; @@ -32,6 +30,7 @@ use url::Url; pub enum WorkerEvent { Message(Buf), Error(ErrBox), + TerminalError(ErrBox), } pub struct WorkerChannelsInternal { @@ -43,18 +42,13 @@ pub struct WorkerChannelsInternal { pub struct WorkerHandle { pub sender: mpsc::Sender, pub receiver: Arc>>, - // terminate_channel } impl WorkerHandle { - pub fn terminate(&self) { - todo!() - } - /// Post message to worker as a host. - pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> { + pub fn post_message(&self, buf: Buf) -> Result<(), ErrBox> { let mut sender = self.sender.clone(); - sender.send(buf).map_err(ErrBox::from).await + sender.try_send(buf).map_err(ErrBox::from) } // TODO: should use `try_lock` and return error if @@ -205,6 +199,7 @@ impl Future for Worker { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let inner = self.get_mut(); + if let Some(deno_inspector) = inner.inspector.as_mut() { // We always poll the inspector if it exists. let _ = deno_inspector.poll_unpin(cx); @@ -249,7 +244,6 @@ impl MainWorker { ops::timers::init(isolate, &state); ops::tty::init(isolate, &state); ops::worker_host::init(isolate, &state); - ops::web_worker::init(isolate, &state, &worker.internal_channels.sender); } Self(worker) }