1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-22 07:14:47 -05:00

refactor: Worker is not a Future (#7895)

This commit rewrites deno::Worker to not implement Future
trait.

Instead there are two separate methods:
- Worker::poll_event_loop() - does single tick of event loop
- Worker::run_event_loop() - runs event loop to completion

Additionally some cleanup to Worker's field visibility was done.
This commit is contained in:
Bartek Iwańczuk 2020-10-09 19:08:10 +02:00 committed by GitHub
parent 9731cbc288
commit f4357f0ff9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 134 additions and 147 deletions

View file

@ -1,7 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::colors;
use crate::inspector::DenoInspector;
use crate::inspector::InspectorSession;
use deno_core::error::AnyError;
use deno_core::serde_json;
@ -14,8 +13,7 @@ pub struct CoverageCollector {
}
impl CoverageCollector {
pub fn new(inspector_ptr: *mut DenoInspector) -> Self {
let session = InspectorSession::new(inspector_ptr);
pub fn new(session: Box<InspectorSession>) -> Self {
Self { session }
}

View file

@ -275,7 +275,7 @@ async fn eval_command(
debug!("main_module {}", &main_module);
worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?;
(&mut *worker).await?;
worker.run_event_loop().await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(())
}
@ -423,7 +423,7 @@ async fn run_repl(flags: Flags) -> Result<(), AnyError> {
ModuleSpecifier::resolve_url_or_path("./$deno$repl.ts").unwrap();
let global_state = GlobalState::new(flags)?;
let mut worker = MainWorker::new(&global_state, main_module.clone());
(&mut *worker).await?;
worker.run_event_loop().await?;
repl::run(&global_state, worker).await
}
@ -454,7 +454,7 @@ async fn run_from_stdin(flags: Flags) -> Result<(), AnyError> {
debug!("main_module {}", main_module);
worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?;
(&mut *worker).await?;
worker.run_event_loop().await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(())
}
@ -500,7 +500,7 @@ async fn run_with_watch(flags: Flags, script: String) -> Result<(), AnyError> {
debug!("main_module {}", main_module);
worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?;
(&mut *worker).await?;
worker.run_event_loop().await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(())
}
@ -525,7 +525,7 @@ async fn run_command(flags: Flags, script: String) -> Result<(), AnyError> {
debug!("main_module {}", main_module);
worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?;
(&mut *worker).await?;
worker.run_event_loop().await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(())
}
@ -578,12 +578,8 @@ async fn test_command(
.save_source_file_in_cache(&main_module, source_file);
let mut maybe_coverage_collector = if flags.coverage {
let inspector = worker
.inspector
.as_mut()
.expect("Inspector is not created.");
let mut coverage_collector = CoverageCollector::new(&mut **inspector);
let session = worker.create_inspector_session();
let mut coverage_collector = CoverageCollector::new(session);
coverage_collector.start_collecting().await?;
Some(coverage_collector)
@ -594,9 +590,9 @@ async fn test_command(
let execute_result = worker.execute_module(&main_module).await;
execute_result?;
worker.execute("window.dispatchEvent(new Event('load'))")?;
(&mut *worker).await?;
worker.run_event_loop().await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?;
(&mut *worker).await?;
worker.run_event_loop().await?;
if let Some(coverage_collector) = maybe_coverage_collector.as_mut() {
let coverages = coverage_collector.collect().await?;

View file

@ -173,7 +173,8 @@ fn run_worker_thread(
// TODO(bartlomieju): this thread should return result of event loop
// that means that we should store JoinHandle to thread to ensure
// that it actually terminates.
rt.block_on(worker).expect("Panic in event loop");
rt.block_on(worker.run_event_loop())
.expect("Panic in event loop");
debug!("Worker thread shuts down {}", &name);
})?;

View file

@ -47,7 +47,7 @@ async fn post_message_and_poll(
return result
}
_ = &mut *worker => {
_ = worker.run_event_loop() => {
// A zero delay is long enough to yield the thread in order to prevent the loop from
// running hot for messages that are taking longer to resolve like for example an
// evaluation of top level await.
@ -75,7 +75,7 @@ async fn read_line_and_poll(
result = &mut line => {
return result.unwrap();
}
_ = &mut *worker, if poll_worker => {
_ = worker.run_event_loop(), if poll_worker => {
poll_worker = false;
}
_ = &mut timeout => {
@ -92,12 +92,7 @@ pub async fn run(
// Our inspector is unable to default to the default context id so we have to specify it here.
let context_id: u32 = 1;
let inspector = worker
.inspector
.as_mut()
.expect("Inspector is not created.");
let mut session = InspectorSession::new(&mut **inspector);
let mut session = worker.create_inspector_session();
let history_file = global_state.dir.root.join("deno_history.txt");

View file

@ -3,6 +3,7 @@
use crate::fmt_errors::JsError;
use crate::global_state::GlobalState;
use crate::inspector::DenoInspector;
use crate::inspector::InspectorSession;
use crate::js;
use crate::metrics::Metrics;
use crate::ops;
@ -11,6 +12,7 @@ use crate::permissions::Permissions;
use crate::state::CliModuleLoader;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt;
use deno_core::futures::stream::StreamExt;
use deno_core::futures::task::AtomicWaker;
@ -22,10 +24,8 @@ use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
use deno_core::Snapshot;
use std::env;
use std::future::Future;
use std::ops::Deref;
use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
@ -95,13 +95,15 @@ fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) {
/// - `MainWorker`
/// - `WebWorker`
pub struct Worker {
pub name: String,
pub js_runtime: JsRuntime,
pub inspector: Option<Box<DenoInspector>>,
pub waker: AtomicWaker,
pub(crate) internal_channels: WorkerChannelsInternal,
external_channels: WorkerHandle,
inspector: Option<Box<DenoInspector>>,
// Following fields are pub because they are accessed
// when creating a new WebWorker instance.
pub(crate) internal_channels: WorkerChannelsInternal,
pub(crate) js_runtime: JsRuntime,
pub(crate) name: String,
should_break_on_first_statement: bool,
waker: AtomicWaker,
}
impl Worker {
@ -147,13 +149,13 @@ impl Worker {
let (internal_channels, external_channels) = create_channels();
Self {
name,
js_runtime,
inspector,
waker: AtomicWaker::new(),
internal_channels,
external_channels,
inspector,
internal_channels,
js_runtime,
name,
should_break_on_first_statement,
waker: AtomicWaker::new(),
}
}
@ -221,6 +223,28 @@ impl Worker {
.wait_for_session_and_break_on_next_statement()
}
}
/// Create new inspector session. This function panics if Worker
/// was not configured to create inspector.
pub fn create_inspector_session(&mut self) -> Box<InspectorSession> {
let inspector = self.inspector.as_mut().unwrap();
InspectorSession::new(&mut **inspector)
}
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
// We always poll the inspector if it exists.
let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx));
self.waker.register(cx.waker());
self.js_runtime.poll_event_loop(cx)
}
pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
poll_fn(|cx| self.poll_event_loop(cx)).await
}
}
impl Drop for Worker {
@ -231,32 +255,6 @@ impl Drop for Worker {
}
}
impl Future for Worker {
type Output = Result<(), AnyError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
// We always poll the inspector if it exists.
let _ = inner.inspector.as_mut().map(|i| i.poll_unpin(cx));
inner.waker.register(cx.waker());
inner.js_runtime.poll_event_loop(cx)
}
}
impl Deref for Worker {
type Target = JsRuntime;
fn deref(&self) -> &Self::Target {
&self.js_runtime
}
}
impl DerefMut for Worker {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.js_runtime
}
}
/// This worker is created and used by Deno executable.
///
/// It provides ops available in the `Deno` namespace.
@ -278,45 +276,46 @@ impl MainWorker {
loader,
true,
);
let js_runtime = &mut worker.js_runtime;
{
// All ops registered in this function depend on these
{
let op_state = worker.op_state();
let op_state = js_runtime.op_state();
let mut op_state = op_state.borrow_mut();
op_state.put::<Metrics>(Default::default());
op_state.put::<Arc<GlobalState>>(global_state.clone());
op_state.put::<Permissions>(global_state.permissions.clone());
}
ops::runtime::init(&mut worker, main_module);
ops::fetch::init(&mut worker, global_state.flags.ca_file.as_deref());
ops::timers::init(&mut worker);
ops::worker_host::init(&mut worker);
ops::random::init(&mut worker, global_state.flags.seed);
ops::reg_json_sync(&mut worker, "op_close", deno_core::op_close);
ops::reg_json_sync(&mut worker, "op_resources", deno_core::op_resources);
ops::runtime::init(js_runtime, main_module);
ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref());
ops::timers::init(js_runtime);
ops::worker_host::init(js_runtime);
ops::random::init(js_runtime, global_state.flags.seed);
ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close);
ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources);
ops::reg_json_sync(
&mut worker,
js_runtime,
"op_domain_to_ascii",
deno_web::op_domain_to_ascii,
);
ops::errors::init(&mut worker);
ops::fs_events::init(&mut worker);
ops::fs::init(&mut worker);
ops::io::init(&mut worker);
ops::net::init(&mut worker);
ops::os::init(&mut worker);
ops::permissions::init(&mut worker);
ops::plugin::init(&mut worker);
ops::process::init(&mut worker);
ops::runtime_compiler::init(&mut worker);
ops::signal::init(&mut worker);
ops::tls::init(&mut worker);
ops::tty::init(&mut worker);
ops::websocket::init(&mut worker);
ops::errors::init(js_runtime);
ops::fs_events::init(js_runtime);
ops::fs::init(js_runtime);
ops::io::init(js_runtime);
ops::net::init(js_runtime);
ops::os::init(js_runtime);
ops::permissions::init(js_runtime);
ops::plugin::init(js_runtime);
ops::process::init(js_runtime);
ops::runtime_compiler::init(js_runtime);
ops::signal::init(js_runtime);
ops::tls::init(js_runtime);
ops::tty::init(js_runtime);
ops::websocket::init(js_runtime);
}
{
let op_state = worker.op_state();
let op_state = js_runtime.op_state();
let mut op_state = op_state.borrow_mut();
let t = &mut op_state.resource_table;
let (stdin, stdout, stderr) = get_stdio();
@ -449,49 +448,45 @@ impl WebWorker {
{
let handle = web_worker.thread_safe_handle();
let sender = web_worker.worker.internal_channels.sender.clone();
let js_runtime = &mut web_worker.js_runtime;
// All ops registered in this function depend on these
{
let op_state = web_worker.op_state();
let op_state = js_runtime.op_state();
let mut op_state = op_state.borrow_mut();
op_state.put::<Metrics>(Default::default());
op_state.put::<Arc<GlobalState>>(global_state.clone());
op_state.put::<Permissions>(permissions);
}
ops::web_worker::init(&mut web_worker, sender, handle);
ops::runtime::init(&mut web_worker, main_module);
ops::fetch::init(&mut web_worker, global_state.flags.ca_file.as_deref());
ops::timers::init(&mut web_worker);
ops::worker_host::init(&mut web_worker);
ops::reg_json_sync(&mut web_worker, "op_close", deno_core::op_close);
ops::web_worker::init(js_runtime, sender, handle);
ops::runtime::init(js_runtime, main_module);
ops::fetch::init(js_runtime, global_state.flags.ca_file.as_deref());
ops::timers::init(js_runtime);
ops::worker_host::init(js_runtime);
ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close);
ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources);
ops::reg_json_sync(
&mut web_worker,
"op_resources",
deno_core::op_resources,
);
ops::reg_json_sync(
&mut web_worker,
js_runtime,
"op_domain_to_ascii",
deno_web::op_domain_to_ascii,
);
ops::errors::init(&mut web_worker);
ops::io::init(&mut web_worker);
ops::websocket::init(&mut web_worker);
ops::errors::init(js_runtime);
ops::io::init(js_runtime);
ops::websocket::init(js_runtime);
if has_deno_namespace {
ops::fs_events::init(&mut web_worker);
ops::fs::init(&mut web_worker);
ops::net::init(&mut web_worker);
ops::os::init(&mut web_worker);
ops::permissions::init(&mut web_worker);
ops::plugin::init(&mut web_worker);
ops::process::init(&mut web_worker);
ops::random::init(&mut web_worker, global_state.flags.seed);
ops::runtime_compiler::init(&mut web_worker);
ops::signal::init(&mut web_worker);
ops::tls::init(&mut web_worker);
ops::tty::init(&mut web_worker);
ops::fs_events::init(js_runtime);
ops::fs::init(js_runtime);
ops::net::init(js_runtime);
ops::os::init(js_runtime);
ops::permissions::init(js_runtime);
ops::plugin::init(js_runtime);
ops::process::init(js_runtime);
ops::random::init(js_runtime, global_state.flags.seed);
ops::runtime_compiler::init(js_runtime);
ops::signal::init(js_runtime);
ops::tls::init(js_runtime);
ops::tty::init(js_runtime);
}
}
@ -504,38 +499,27 @@ impl WebWorker {
pub fn thread_safe_handle(&self) -> WebWorkerHandle {
self.handle.clone()
}
}
impl Deref for WebWorker {
type Target = Worker;
fn deref(&self) -> &Self::Target {
&self.worker
pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
poll_fn(|cx| self.poll_event_loop(cx)).await
}
}
impl DerefMut for WebWorker {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.worker
}
}
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
let worker = &mut self.worker;
impl Future for WebWorker {
type Output = Result<(), AnyError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let worker = &mut inner.worker;
let terminated = inner.handle.terminated.load(Ordering::Relaxed);
let terminated = self.handle.terminated.load(Ordering::Relaxed);
if terminated {
return Poll::Ready(Ok(()));
}
if !inner.event_loop_idle {
match worker.poll_unpin(cx) {
if !self.event_loop_idle {
match worker.poll_event_loop(cx) {
Poll::Ready(r) => {
let terminated = inner.handle.terminated.load(Ordering::Relaxed);
let terminated = self.handle.terminated.load(Ordering::Relaxed);
if terminated {
return Poll::Ready(Ok(()));
}
@ -546,13 +530,13 @@ impl Future for WebWorker {
.try_send(WorkerEvent::Error(e))
.expect("Failed to post message to host");
}
inner.event_loop_idle = true;
self.event_loop_idle = true;
}
Poll::Pending => {}
}
}
if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) {
if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) {
// terminate_rx should never be closed
assert!(r.is_some());
return Poll::Ready(Ok(()));
@ -569,7 +553,7 @@ impl Future for WebWorker {
if let Err(e) = worker.execute(&script) {
// If execution was terminated during message callback then
// just ignore it
if inner.handle.terminated.load(Ordering::Relaxed) {
if self.handle.terminated.load(Ordering::Relaxed) {
return Poll::Ready(Ok(()));
}
@ -581,7 +565,7 @@ impl Future for WebWorker {
}
// Let event loop be polled again
inner.event_loop_idle = false;
self.event_loop_idle = false;
worker.waker.wake();
}
None => unreachable!(),
@ -592,6 +576,19 @@ impl Future for WebWorker {
}
}
impl Deref for WebWorker {
type Target = Worker;
fn deref(&self) -> &Self::Target {
&self.worker
}
}
impl DerefMut for WebWorker {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.worker
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -628,7 +625,7 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
if let Err(e) = (&mut *worker).await {
if let Err(e) = worker.run_event_loop().await {
panic!("Future got unexpected error: {:?}", e);
}
}
@ -646,7 +643,7 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
if let Err(e) = (&mut *worker).await {
if let Err(e) = worker.run_event_loop().await {
panic!("Future got unexpected error: {:?}", e);
}
}
@ -665,7 +662,7 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
if let Err(e) = (&mut *worker).await {
if let Err(e) = worker.run_event_loop().await {
panic!("Future got unexpected error: {:?}", e);
}
}
@ -733,7 +730,7 @@ mod tests {
worker.execute(source).unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
let r = tokio_util::run_basic(worker);
let r = tokio_util::run_basic(worker.run_event_loop());
assert!(r.is_ok())
});
@ -780,7 +777,7 @@ mod tests {
worker.execute("onmessage = () => { close(); }").unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
let r = tokio_util::run_basic(worker);
let r = tokio_util::run_basic(worker.run_event_loop());
assert!(r.is_ok())
});