1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-22 07:14:47 -05:00

reland JsRuntime/Worker is not a Future (#7924)

This commit is contained in:
Bartek Iwańczuk 2020-10-11 13:20:40 +02:00 committed by GitHub
parent 7af5041a06
commit 527628e186
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 203 additions and 204 deletions

View file

@ -1,7 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::colors; use crate::colors;
use crate::inspector::DenoInspector;
use crate::inspector::InspectorSession; use crate::inspector::InspectorSession;
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::serde_json; use deno_core::serde_json;
@ -14,8 +13,7 @@ pub struct CoverageCollector {
} }
impl CoverageCollector { impl CoverageCollector {
pub fn new(inspector_ptr: *mut DenoInspector) -> Self { pub fn new(session: Box<InspectorSession>) -> Self {
let session = InspectorSession::new(inspector_ptr);
Self { session } Self { session }
} }

View file

@ -275,7 +275,7 @@ async fn eval_command(
debug!("main_module {}", &main_module); debug!("main_module {}", &main_module);
worker.execute_module(&main_module).await?; worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?; worker.execute("window.dispatchEvent(new Event('load'))")?;
(&mut *worker).await?; worker.run_event_loop().await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?; worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(()) Ok(())
} }
@ -423,7 +423,7 @@ async fn run_repl(flags: Flags) -> Result<(), AnyError> {
ModuleSpecifier::resolve_url_or_path("./$deno$repl.ts").unwrap(); ModuleSpecifier::resolve_url_or_path("./$deno$repl.ts").unwrap();
let global_state = GlobalState::new(flags)?; let global_state = GlobalState::new(flags)?;
let mut worker = MainWorker::new(&global_state, main_module.clone()); let mut worker = MainWorker::new(&global_state, main_module.clone());
(&mut *worker).await?; worker.run_event_loop().await?;
repl::run(&global_state, worker).await repl::run(&global_state, worker).await
} }
@ -454,7 +454,7 @@ async fn run_from_stdin(flags: Flags) -> Result<(), AnyError> {
debug!("main_module {}", main_module); debug!("main_module {}", main_module);
worker.execute_module(&main_module).await?; worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?; worker.execute("window.dispatchEvent(new Event('load'))")?;
(&mut *worker).await?; worker.run_event_loop().await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?; worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(()) Ok(())
} }
@ -500,7 +500,7 @@ async fn run_with_watch(flags: Flags, script: String) -> Result<(), AnyError> {
debug!("main_module {}", main_module); debug!("main_module {}", main_module);
worker.execute_module(&main_module).await?; worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?; worker.execute("window.dispatchEvent(new Event('load'))")?;
(&mut *worker).await?; worker.run_event_loop().await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?; worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(()) Ok(())
} }
@ -525,7 +525,7 @@ async fn run_command(flags: Flags, script: String) -> Result<(), AnyError> {
debug!("main_module {}", main_module); debug!("main_module {}", main_module);
worker.execute_module(&main_module).await?; worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?; worker.execute("window.dispatchEvent(new Event('load'))")?;
(&mut *worker).await?; worker.run_event_loop().await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?; worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(()) Ok(())
} }
@ -578,12 +578,8 @@ async fn test_command(
.save_source_file_in_cache(&main_module, source_file); .save_source_file_in_cache(&main_module, source_file);
let mut maybe_coverage_collector = if flags.coverage { let mut maybe_coverage_collector = if flags.coverage {
let inspector = worker let session = worker.create_inspector_session();
.inspector let mut coverage_collector = CoverageCollector::new(session);
.as_mut()
.expect("Inspector is not created.");
let mut coverage_collector = CoverageCollector::new(&mut **inspector);
coverage_collector.start_collecting().await?; coverage_collector.start_collecting().await?;
Some(coverage_collector) Some(coverage_collector)
@ -594,9 +590,9 @@ async fn test_command(
let execute_result = worker.execute_module(&main_module).await; let execute_result = worker.execute_module(&main_module).await;
execute_result?; execute_result?;
worker.execute("window.dispatchEvent(new Event('load'))")?; worker.execute("window.dispatchEvent(new Event('load'))")?;
(&mut *worker).await?; worker.run_event_loop().await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?; worker.execute("window.dispatchEvent(new Event('unload'))")?;
(&mut *worker).await?; worker.run_event_loop().await?;
if let Some(coverage_collector) = maybe_coverage_collector.as_mut() { if let Some(coverage_collector) = maybe_coverage_collector.as_mut() {
let coverages = coverage_collector.collect().await?; let coverages = coverage_collector.collect().await?;

View file

@ -155,6 +155,7 @@ fn run_worker_thread(
if let Err(e) = result { if let Err(e) = result {
let mut sender = worker.internal_channels.sender.clone(); let mut sender = worker.internal_channels.sender.clone();
sender sender
.try_send(WorkerEvent::TerminalError(e)) .try_send(WorkerEvent::TerminalError(e))
.expect("Failed to post message to host"); .expect("Failed to post message to host");
@ -166,7 +167,8 @@ fn run_worker_thread(
// TODO(bartlomieju): this thread should return result of event loop // TODO(bartlomieju): this thread should return result of event loop
// that means that we should store JoinHandle to thread to ensure // that means that we should store JoinHandle to thread to ensure
// that it actually terminates. // that it actually terminates.
rt.block_on(worker).expect("Panic in event loop"); rt.block_on(worker.run_event_loop())
.expect("Panic in event loop");
debug!("Worker thread shuts down {}", &name); debug!("Worker thread shuts down {}", &name);
})?; })?;

View file

@ -47,7 +47,7 @@ async fn post_message_and_poll(
return result return result
} }
_ = &mut *worker => { _ = worker.run_event_loop() => {
// A zero delay is long enough to yield the thread in order to prevent the loop from // A zero delay is long enough to yield the thread in order to prevent the loop from
// running hot for messages that are taking longer to resolve like for example an // running hot for messages that are taking longer to resolve like for example an
// evaluation of top level await. // evaluation of top level await.
@ -75,7 +75,7 @@ async fn read_line_and_poll(
result = &mut line => { result = &mut line => {
return result.unwrap(); return result.unwrap();
} }
_ = &mut *worker, if poll_worker => { _ = worker.run_event_loop(), if poll_worker => {
poll_worker = false; poll_worker = false;
} }
_ = &mut timeout => { _ = &mut timeout => {
@ -92,12 +92,7 @@ pub async fn run(
// Our inspector is unable to default to the default context id so we have to specify it here. // Our inspector is unable to default to the default context id so we have to specify it here.
let context_id: u32 = 1; let context_id: u32 = 1;
let inspector = worker let mut session = worker.create_inspector_session();
.inspector
.as_mut()
.expect("Inspector is not created.");
let mut session = InspectorSession::new(&mut **inspector);
let history_file = global_state.dir.root.join("deno_history.txt"); let history_file = global_state.dir.root.join("deno_history.txt");

View file

@ -3,6 +3,7 @@
use crate::fmt_errors::JsError; use crate::fmt_errors::JsError;
use crate::global_state::GlobalState; use crate::global_state::GlobalState;
use crate::inspector::DenoInspector; use crate::inspector::DenoInspector;
use crate::inspector::InspectorSession;
use crate::js; use crate::js;
use crate::metrics::Metrics; use crate::metrics::Metrics;
use crate::ops; use crate::ops;
@ -11,6 +12,7 @@ use crate::permissions::Permissions;
use crate::state::CliModuleLoader; use crate::state::CliModuleLoader;
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc; use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt; use deno_core::futures::future::FutureExt;
use deno_core::futures::stream::StreamExt; use deno_core::futures::stream::StreamExt;
use deno_core::futures::task::AtomicWaker; use deno_core::futures::task::AtomicWaker;
@ -22,10 +24,8 @@ use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions; use deno_core::RuntimeOptions;
use deno_core::Snapshot; use deno_core::Snapshot;
use std::env; use std::env;
use std::future::Future;
use std::ops::Deref; use std::ops::Deref;
use std::ops::DerefMut; use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -95,13 +95,15 @@ fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) {
/// - `MainWorker` /// - `MainWorker`
/// - `WebWorker` /// - `WebWorker`
pub struct Worker { pub struct Worker {
pub name: String,
pub js_runtime: JsRuntime,
pub inspector: Option<Box<DenoInspector>>,
pub waker: AtomicWaker,
pub(crate) internal_channels: WorkerChannelsInternal,
external_channels: WorkerHandle, external_channels: WorkerHandle,
inspector: Option<Box<DenoInspector>>,
// Following fields are pub because they are accessed
// when creating a new WebWorker instance.
pub(crate) internal_channels: WorkerChannelsInternal,
pub(crate) js_runtime: JsRuntime,
pub(crate) name: String,
should_break_on_first_statement: bool, should_break_on_first_statement: bool,
waker: AtomicWaker,
} }
impl Worker { impl Worker {
@ -147,13 +149,13 @@ impl Worker {
let (internal_channels, external_channels) = create_channels(); let (internal_channels, external_channels) = create_channels();
Self { Self {
name,
js_runtime,
inspector,
waker: AtomicWaker::new(),
internal_channels,
external_channels, external_channels,
inspector,
internal_channels,
js_runtime,
name,
should_break_on_first_statement, should_break_on_first_statement,
waker: AtomicWaker::new(),
} }
} }
@ -221,6 +223,28 @@ impl Worker {
.wait_for_session_and_break_on_next_statement() .wait_for_session_and_break_on_next_statement()
} }
} }
/// Create new inspector session. This function panics if Worker
/// was not configured to create inspector.
pub fn create_inspector_session(&mut self) -> Box<InspectorSession> {
let inspector = self.inspector.as_mut().unwrap();
InspectorSession::new(&mut **inspector)
}
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
// We always poll the inspector if it exists.
let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx));
self.waker.register(cx.waker());
self.js_runtime.poll_event_loop(cx)
}
pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
poll_fn(|cx| self.poll_event_loop(cx)).await
}
} }
impl Drop for Worker { impl Drop for Worker {
@ -231,32 +255,6 @@ impl Drop for Worker {
} }
} }
impl Future for Worker {
type Output = Result<(), AnyError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
// We always poll the inspector if it exists.
let _ = inner.inspector.as_mut().map(|i| i.poll_unpin(cx));
inner.waker.register(cx.waker());
inner.js_runtime.poll_unpin(cx)
}
}
impl Deref for Worker {
type Target = JsRuntime;
fn deref(&self) -> &Self::Target {
&self.js_runtime
}
}
impl DerefMut for Worker {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.js_runtime
}
}
/// This worker is created and used by Deno executable. /// This worker is created and used by Deno executable.
/// ///
/// It provides ops available in the `Deno` namespace. /// It provides ops available in the `Deno` namespace.
@ -278,45 +276,46 @@ impl MainWorker {
loader, loader,
true, true,
); );
let js_runtime = &mut worker.js_runtime;
{ {
// All ops registered in this function depend on these // All ops registered in this function depend on these
{ {
let op_state = worker.op_state(); let op_state = js_runtime.op_state();
let mut op_state = op_state.borrow_mut(); let mut op_state = op_state.borrow_mut();
op_state.put::<Metrics>(Default::default()); op_state.put::<Metrics>(Default::default());
op_state.put::<Arc<GlobalState>>(global_state.clone()); op_state.put::<Arc<GlobalState>>(global_state.clone());
op_state.put::<Permissions>(global_state.permissions.clone()); op_state.put::<Permissions>(global_state.permissions.clone());
} }
ops::runtime::init(&mut worker, main_module); ops::runtime::init(js_runtime, main_module);
ops::fetch::init(&mut worker, global_state.flags.ca_file.as_deref()); ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref());
ops::timers::init(&mut worker); ops::timers::init(js_runtime);
ops::worker_host::init(&mut worker); ops::worker_host::init(js_runtime);
ops::random::init(&mut worker, global_state.flags.seed); ops::random::init(js_runtime, global_state.flags.seed);
ops::reg_json_sync(&mut worker, "op_close", deno_core::op_close); ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close);
ops::reg_json_sync(&mut worker, "op_resources", deno_core::op_resources); ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources);
ops::reg_json_sync( ops::reg_json_sync(
&mut worker, js_runtime,
"op_domain_to_ascii", "op_domain_to_ascii",
deno_web::op_domain_to_ascii, deno_web::op_domain_to_ascii,
); );
ops::errors::init(&mut worker); ops::errors::init(js_runtime);
ops::fs_events::init(&mut worker); ops::fs_events::init(js_runtime);
ops::fs::init(&mut worker); ops::fs::init(js_runtime);
ops::io::init(&mut worker); ops::io::init(js_runtime);
ops::net::init(&mut worker); ops::net::init(js_runtime);
ops::os::init(&mut worker); ops::os::init(js_runtime);
ops::permissions::init(&mut worker); ops::permissions::init(js_runtime);
ops::plugin::init(&mut worker); ops::plugin::init(js_runtime);
ops::process::init(&mut worker); ops::process::init(js_runtime);
ops::runtime_compiler::init(&mut worker); ops::runtime_compiler::init(js_runtime);
ops::signal::init(&mut worker); ops::signal::init(js_runtime);
ops::tls::init(&mut worker); ops::tls::init(js_runtime);
ops::tty::init(&mut worker); ops::tty::init(js_runtime);
ops::websocket::init(&mut worker); ops::websocket::init(js_runtime);
} }
{ {
let op_state = worker.op_state(); let op_state = js_runtime.op_state();
let mut op_state = op_state.borrow_mut(); let mut op_state = op_state.borrow_mut();
let t = &mut op_state.resource_table; let t = &mut op_state.resource_table;
let (stdin, stdout, stderr) = get_stdio(); let (stdin, stdout, stderr) = get_stdio();
@ -449,49 +448,45 @@ impl WebWorker {
{ {
let handle = web_worker.thread_safe_handle(); let handle = web_worker.thread_safe_handle();
let sender = web_worker.worker.internal_channels.sender.clone(); let sender = web_worker.worker.internal_channels.sender.clone();
let js_runtime = &mut web_worker.js_runtime;
// All ops registered in this function depend on these // All ops registered in this function depend on these
{ {
let op_state = web_worker.op_state(); let op_state = js_runtime.op_state();
let mut op_state = op_state.borrow_mut(); let mut op_state = op_state.borrow_mut();
op_state.put::<Metrics>(Default::default()); op_state.put::<Metrics>(Default::default());
op_state.put::<Arc<GlobalState>>(global_state.clone()); op_state.put::<Arc<GlobalState>>(global_state.clone());
op_state.put::<Permissions>(permissions); op_state.put::<Permissions>(permissions);
} }
ops::web_worker::init(&mut web_worker, sender, handle); ops::web_worker::init(js_runtime, sender, handle);
ops::runtime::init(&mut web_worker, main_module); ops::runtime::init(js_runtime, main_module);
ops::fetch::init(&mut web_worker, global_state.flags.ca_file.as_deref()); ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref());
ops::timers::init(&mut web_worker); ops::timers::init(js_runtime);
ops::worker_host::init(&mut web_worker); ops::worker_host::init(js_runtime);
ops::reg_json_sync(&mut web_worker, "op_close", deno_core::op_close); ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close);
ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources);
ops::reg_json_sync( ops::reg_json_sync(
&mut web_worker, js_runtime,
"op_resources",
deno_core::op_resources,
);
ops::reg_json_sync(
&mut web_worker,
"op_domain_to_ascii", "op_domain_to_ascii",
deno_web::op_domain_to_ascii, deno_web::op_domain_to_ascii,
); );
ops::errors::init(&mut web_worker); ops::errors::init(js_runtime);
ops::io::init(&mut web_worker); ops::io::init(js_runtime);
ops::websocket::init(&mut web_worker); ops::websocket::init(js_runtime);
if has_deno_namespace { if has_deno_namespace {
ops::fs_events::init(&mut web_worker); ops::fs_events::init(js_runtime);
ops::fs::init(&mut web_worker); ops::fs::init(js_runtime);
ops::net::init(&mut web_worker); ops::net::init(js_runtime);
ops::os::init(&mut web_worker); ops::os::init(js_runtime);
ops::permissions::init(&mut web_worker); ops::permissions::init(js_runtime);
ops::plugin::init(&mut web_worker); ops::plugin::init(js_runtime);
ops::process::init(&mut web_worker); ops::process::init(js_runtime);
ops::random::init(&mut web_worker, global_state.flags.seed); ops::random::init(js_runtime, global_state.flags.seed);
ops::runtime_compiler::init(&mut web_worker); ops::runtime_compiler::init(js_runtime);
ops::signal::init(&mut web_worker); ops::signal::init(js_runtime);
ops::tls::init(&mut web_worker); ops::tls::init(js_runtime);
ops::tty::init(&mut web_worker); ops::tty::init(js_runtime);
} }
} }
@ -504,38 +499,27 @@ impl WebWorker {
pub fn thread_safe_handle(&self) -> WebWorkerHandle { pub fn thread_safe_handle(&self) -> WebWorkerHandle {
self.handle.clone() self.handle.clone()
} }
pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
poll_fn(|cx| self.poll_event_loop(cx)).await
} }
impl Deref for WebWorker { pub fn poll_event_loop(
type Target = Worker; &mut self,
fn deref(&self) -> &Self::Target { cx: &mut Context,
&self.worker ) -> Poll<Result<(), AnyError>> {
} let worker = &mut self.worker;
}
impl DerefMut for WebWorker { let terminated = self.handle.terminated.load(Ordering::Relaxed);
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.worker
}
}
impl Future for WebWorker {
type Output = Result<(), AnyError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let worker = &mut inner.worker;
let terminated = inner.handle.terminated.load(Ordering::Relaxed);
if terminated { if terminated {
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
if !inner.event_loop_idle { if !self.event_loop_idle {
match worker.poll_unpin(cx) { match worker.poll_event_loop(cx) {
Poll::Ready(r) => { Poll::Ready(r) => {
let terminated = inner.handle.terminated.load(Ordering::Relaxed); let terminated = self.handle.terminated.load(Ordering::Relaxed);
if terminated { if terminated {
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
@ -546,13 +530,13 @@ impl Future for WebWorker {
.try_send(WorkerEvent::Error(e)) .try_send(WorkerEvent::Error(e))
.expect("Failed to post message to host"); .expect("Failed to post message to host");
} }
inner.event_loop_idle = true; self.event_loop_idle = true;
} }
Poll::Pending => {} Poll::Pending => {}
} }
} }
if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) { if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) {
// terminate_rx should never be closed // terminate_rx should never be closed
assert!(r.is_some()); assert!(r.is_some());
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
@ -569,7 +553,7 @@ impl Future for WebWorker {
if let Err(e) = worker.execute(&script) { if let Err(e) = worker.execute(&script) {
// If execution was terminated during message callback then // If execution was terminated during message callback then
// just ignore it // just ignore it
if inner.handle.terminated.load(Ordering::Relaxed) { if self.handle.terminated.load(Ordering::Relaxed) {
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
@ -581,7 +565,7 @@ impl Future for WebWorker {
} }
// Let event loop be polled again // Let event loop be polled again
inner.event_loop_idle = false; self.event_loop_idle = false;
worker.waker.wake(); worker.waker.wake();
} }
None => unreachable!(), None => unreachable!(),
@ -592,6 +576,19 @@ impl Future for WebWorker {
} }
} }
impl Deref for WebWorker {
type Target = Worker;
fn deref(&self) -> &Self::Target {
&self.worker
}
}
impl DerefMut for WebWorker {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.worker
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -628,7 +625,7 @@ mod tests {
if let Err(err) = result { if let Err(err) = result {
eprintln!("execute_mod err {:?}", err); eprintln!("execute_mod err {:?}", err);
} }
if let Err(e) = (&mut *worker).await { if let Err(e) = worker.run_event_loop().await {
panic!("Future got unexpected error: {:?}", e); panic!("Future got unexpected error: {:?}", e);
} }
} }
@ -646,7 +643,7 @@ mod tests {
if let Err(err) = result { if let Err(err) = result {
eprintln!("execute_mod err {:?}", err); eprintln!("execute_mod err {:?}", err);
} }
if let Err(e) = (&mut *worker).await { if let Err(e) = worker.run_event_loop().await {
panic!("Future got unexpected error: {:?}", e); panic!("Future got unexpected error: {:?}", e);
} }
} }
@ -665,7 +662,7 @@ mod tests {
if let Err(err) = result { if let Err(err) = result {
eprintln!("execute_mod err {:?}", err); eprintln!("execute_mod err {:?}", err);
} }
if let Err(e) = (&mut *worker).await { if let Err(e) = worker.run_event_loop().await {
panic!("Future got unexpected error: {:?}", e); panic!("Future got unexpected error: {:?}", e);
} }
} }
@ -733,7 +730,7 @@ mod tests {
worker.execute(source).unwrap(); worker.execute(source).unwrap();
let handle = worker.thread_safe_handle(); let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap(); handle_sender.send(handle).unwrap();
let r = tokio_util::run_basic(worker); let r = tokio_util::run_basic(worker.run_event_loop());
assert!(r.is_ok()) assert!(r.is_ok())
}); });
@ -780,7 +777,7 @@ mod tests {
worker.execute("onmessage = () => { close(); }").unwrap(); worker.execute("onmessage = () => { close(); }").unwrap();
let handle = worker.thread_safe_handle(); let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap(); handle_sender.send(handle).unwrap();
let r = tokio_util::run_basic(worker); let r = tokio_util::run_basic(worker.run_event_loop());
assert!(r.is_ok()) assert!(r.is_ok())
}); });

View file

@ -9,9 +9,12 @@ bindings.
This Rust crate contains the essential V8 bindings for Deno's command-line This Rust crate contains the essential V8 bindings for Deno's command-line
interface (Deno CLI). The main abstraction here is the JsRuntime which provides interface (Deno CLI). The main abstraction here is the JsRuntime which provides
a way to execute JavaScript. The JsRuntime is modeled as a a way to execute JavaScript.
`Future<Item=(), Error=JsError>` which completes once all of its ops have
completed. The JsRuntime implements an event loop abstraction for the executed code that
keeps track of all pending tasks (async ops, dynamic module loads). It is user's
responsibility to drive that loop by using `JsRuntime::run_event_loop` method -
it must be executed in the context of Rust's future executor (eg. tokio, smol).
In order to bind Rust functions into JavaScript, use the `Deno.core.dispatch()` In order to bind Rust functions into JavaScript, use the `Deno.core.dispatch()`
function to trigger the "dispatch" callback in Rust. The user is responsible for function to trigger the "dispatch" callback in Rust. The user is responsible for

View file

@ -260,7 +260,7 @@ fn main() {
include_str!("http_bench_bin_ops.js"), include_str!("http_bench_bin_ops.js"),
) )
.unwrap(); .unwrap();
js_runtime.await js_runtime.run_event_loop().await
}; };
runtime.block_on(future).unwrap(); runtime.block_on(future).unwrap();
} }

View file

@ -193,7 +193,7 @@ fn main() {
include_str!("http_bench_json_ops.js"), include_str!("http_bench_json_ops.js"),
) )
.unwrap(); .unwrap();
js_runtime.await js_runtime.run_event_loop().await
}; };
runtime.block_on(future).unwrap(); runtime.block_on(future).unwrap();
} }

View file

@ -23,6 +23,7 @@ use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE; use crate::shared_queue::RECOMMENDED_SIZE;
use crate::BufVec; use crate::BufVec;
use crate::OpState; use crate::OpState;
use futures::future::poll_fn;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use futures::stream::StreamFuture; use futures::stream::StreamFuture;
@ -442,28 +443,24 @@ impl JsRuntime {
.remove_near_heap_limit_callback(cb, heap_limit); .remove_near_heap_limit_callback(cb, heap_limit);
} }
} }
/// Runs event loop to completion
///
/// This future resolves when:
/// - there are no more pending dynamic imports
/// - there are no more pending ops
pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
poll_fn(|cx| self.poll_event_loop(cx)).await
} }
extern "C" fn near_heap_limit_callback<F>( /// Runs a single tick of event loop
data: *mut c_void, pub fn poll_event_loop(
current_heap_limit: usize, &mut self,
initial_heap_limit: usize, cx: &mut Context,
) -> usize ) -> Poll<Result<(), AnyError>> {
where self.shared_init();
F: FnMut(usize, usize) -> usize,
{
let callback = unsafe { &mut *(data as *mut F) };
callback(current_heap_limit, initial_heap_limit)
}
impl Future for JsRuntime { let state_rc = Self::state(self.v8_isolate());
type Output = Result<(), AnyError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let runtime = self.get_mut();
runtime.shared_init();
let state_rc = Self::state(runtime.v8_isolate());
{ {
let state = state_rc.borrow(); let state = state_rc.borrow();
state.waker.register(cx.waker()); state.waker.register(cx.waker());
@ -471,21 +468,21 @@ impl Future for JsRuntime {
// Dynamic module loading - ie. modules loaded using "import()" // Dynamic module loading - ie. modules loaded using "import()"
{ {
let poll_imports = runtime.prepare_dyn_imports(cx)?; let poll_imports = self.prepare_dyn_imports(cx)?;
assert!(poll_imports.is_ready()); assert!(poll_imports.is_ready());
let poll_imports = runtime.poll_dyn_imports(cx)?; let poll_imports = self.poll_dyn_imports(cx)?;
assert!(poll_imports.is_ready()); assert!(poll_imports.is_ready());
runtime.check_promise_exceptions()?; self.check_promise_exceptions()?;
} }
// Ops // Ops
{ {
let overflow_response = runtime.poll_pending_ops(cx); let overflow_response = self.poll_pending_ops(cx);
runtime.async_op_response(overflow_response)?; self.async_op_response(overflow_response)?;
runtime.drain_macrotasks()?; self.drain_macrotasks()?;
runtime.check_promise_exceptions()?; self.check_promise_exceptions()?;
} }
let state = state_rc.borrow(); let state = state_rc.borrow();
@ -509,6 +506,18 @@ impl Future for JsRuntime {
} }
} }
extern "C" fn near_heap_limit_callback<F>(
data: *mut c_void,
current_heap_limit: usize,
initial_heap_limit: usize,
) -> usize
where
F: FnMut(usize, usize) -> usize,
{
let callback = unsafe { &mut *(data as *mut F) };
callback(current_heap_limit, initial_heap_limit)
}
impl JsRuntimeState { impl JsRuntimeState {
// Called by V8 during `Isolate::mod_instantiate`. // Called by V8 during `Isolate::mod_instantiate`.
pub fn module_resolve_cb( pub fn module_resolve_cb(
@ -1215,13 +1224,13 @@ pub mod tests {
futures::executor::block_on(lazy(move |cx| f(cx))); futures::executor::block_on(lazy(move |cx| f(cx)));
} }
fn poll_until_ready<F>(future: &mut F, max_poll_count: usize) -> F::Output fn poll_until_ready(
where runtime: &mut JsRuntime,
F: Future + Unpin, max_poll_count: usize,
{ ) -> Result<(), AnyError> {
let mut cx = Context::from_waker(futures::task::noop_waker_ref()); let mut cx = Context::from_waker(futures::task::noop_waker_ref());
for _ in 0..max_poll_count { for _ in 0..max_poll_count {
match future.poll_unpin(&mut cx) { match runtime.poll_event_loop(&mut cx) {
Poll::Pending => continue, Poll::Pending => continue,
Poll::Ready(val) => return val, Poll::Ready(val) => return val,
} }
@ -1437,7 +1446,7 @@ pub mod tests {
) )
.unwrap(); .unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
runtime runtime
.execute( .execute(
@ -1450,11 +1459,11 @@ pub mod tests {
) )
.unwrap(); .unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
runtime.execute("check3.js", "assert(nrecv == 2)").unwrap(); runtime.execute("check3.js", "assert(nrecv == 2)").unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
// We are idle, so the next poll should be the last. // We are idle, so the next poll should be the last.
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
}); });
} }
@ -1478,7 +1487,7 @@ pub mod tests {
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
// The above op never finish, but runtime can finish // The above op never finish, but runtime can finish
// because the op is an unreffed async op. // because the op is an unreffed async op.
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
}) })
} }
@ -1608,7 +1617,7 @@ pub mod tests {
) )
.unwrap(); .unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
runtime runtime
.execute("check.js", "assert(asyncRecv == 1);") .execute("check.js", "assert(asyncRecv == 1);")
.unwrap(); .unwrap();
@ -1700,7 +1709,7 @@ pub mod tests {
"#, "#,
) )
.unwrap(); .unwrap();
if let Poll::Ready(Err(_)) = runtime.poll_unpin(&mut cx) { if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) {
unreachable!(); unreachable!();
} }
}); });
@ -1713,7 +1722,7 @@ pub mod tests {
runtime runtime
.execute("core_test.js", include_str!("core_test.js")) .execute("core_test.js", include_str!("core_test.js"))
.unwrap(); .unwrap();
if let Poll::Ready(Err(_)) = runtime.poll_unpin(&mut cx) { if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) {
unreachable!(); unreachable!();
} }
}); });
@ -1739,7 +1748,7 @@ pub mod tests {
include_str!("encode_decode_test.js"), include_str!("encode_decode_test.js"),
) )
.unwrap(); .unwrap();
if let Poll::Ready(Err(_)) = runtime.poll_unpin(&mut cx) { if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) {
unreachable!(); unreachable!();
} }
}); });
@ -2047,7 +2056,7 @@ pub mod tests {
assert_eq!(count.load(Ordering::Relaxed), 0); assert_eq!(count.load(Ordering::Relaxed), 0);
// We should get an error here. // We should get an error here.
let result = runtime.poll_unpin(cx); let result = runtime.poll_event_loop(cx);
if let Poll::Ready(Ok(_)) = result { if let Poll::Ready(Ok(_)) = result {
unreachable!(); unreachable!();
} }
@ -2140,14 +2149,14 @@ pub mod tests {
.unwrap(); .unwrap();
// First poll runs `prepare_load` hook. // First poll runs `prepare_load` hook.
assert!(matches!(runtime.poll_unpin(cx), Poll::Pending)); assert!(matches!(runtime.poll_event_loop(cx), Poll::Pending));
assert_eq!(prepare_load_count.load(Ordering::Relaxed), 1); assert_eq!(prepare_load_count.load(Ordering::Relaxed), 1);
// Second poll actually loads modules into the isolate. // Second poll actually loads modules into the isolate.
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
assert_eq!(resolve_count.load(Ordering::Relaxed), 4); assert_eq!(resolve_count.load(Ordering::Relaxed), 4);
assert_eq!(load_count.load(Ordering::Relaxed), 2); assert_eq!(load_count.load(Ordering::Relaxed), 2);
assert!(matches!(runtime.poll_unpin(cx), Poll::Ready(Ok(_)))); assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
assert_eq!(resolve_count.load(Ordering::Relaxed), 4); assert_eq!(resolve_count.load(Ordering::Relaxed), 4);
assert_eq!(load_count.load(Ordering::Relaxed), 2); assert_eq!(load_count.load(Ordering::Relaxed), 2);
}) })
@ -2179,10 +2188,10 @@ pub mod tests {
) )
.unwrap(); .unwrap();
// First poll runs `prepare_load` hook. // First poll runs `prepare_load` hook.
let _ = runtime.poll_unpin(cx); let _ = runtime.poll_event_loop(cx);
assert_eq!(prepare_load_count.load(Ordering::Relaxed), 1); assert_eq!(prepare_load_count.load(Ordering::Relaxed), 1);
// Second poll triggers error // Second poll triggers error
let _ = runtime.poll_unpin(cx); let _ = runtime.poll_event_loop(cx);
}) })
} }
@ -2313,7 +2322,7 @@ main();
at async error_async_stack.js:10:5 at async error_async_stack.js:10:5
"#; "#;
match runtime.poll_unpin(cx) { match runtime.poll_event_loop(cx) {
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
assert_eq!(e.to_string(), expected_error); assert_eq!(e.to_string(), expected_error);
} }

View file

@ -75,7 +75,6 @@ pub fn get_declaration() -> PathBuf {
mod tests { mod tests {
use deno_core::JsRuntime; use deno_core::JsRuntime;
use futures::future::lazy; use futures::future::lazy;
use futures::future::FutureExt;
use futures::task::Context; use futures::task::Context;
use futures::task::Poll; use futures::task::Poll;
@ -102,7 +101,7 @@ mod tests {
include_str!("abort_controller_test.js"), include_str!("abort_controller_test.js"),
) )
.unwrap(); .unwrap();
if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) {
unreachable!(); unreachable!();
} }
}); });
@ -115,7 +114,7 @@ mod tests {
isolate isolate
.execute("event_test.js", include_str!("event_test.js")) .execute("event_test.js", include_str!("event_test.js"))
.unwrap(); .unwrap();
if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) {
unreachable!(); unreachable!();
} }
}); });
@ -134,7 +133,7 @@ mod tests {
} else { } else {
unreachable!(); unreachable!();
} }
if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) {
unreachable!(); unreachable!();
} }
}); });
@ -147,7 +146,7 @@ mod tests {
isolate isolate
.execute("event_target_test.js", include_str!("event_target_test.js")) .execute("event_target_test.js", include_str!("event_target_test.js"))
.unwrap(); .unwrap();
if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) {
unreachable!(); unreachable!();
} }
}); });
@ -163,7 +162,7 @@ mod tests {
include_str!("text_encoding_test.js"), include_str!("text_encoding_test.js"),
) )
.unwrap(); .unwrap();
if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { if let Poll::Ready(Err(_)) = isolate.poll_event_loop(&mut cx) {
unreachable!(); unreachable!();
} }
}); });