diff --git a/core/examples/schedule_task.rs b/core/examples/schedule_task.rs new file mode 100644 index 0000000000..2f4909b4fa --- /dev/null +++ b/core/examples/schedule_task.rs @@ -0,0 +1,66 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use deno_core::anyhow::Error; +use deno_core::Extension; +use deno_core::JsRuntime; +use deno_core::OpState; +use deno_core::RuntimeOptions; +use futures::channel::mpsc; +use futures::stream::StreamExt; +use std::task::Poll; + +type Task = Box; + +fn main() { + let my_ext = Extension::builder() + .ops(vec![( + "op_schedule_task", + deno_core::op_sync(op_schedule_task), + )]) + .event_loop_middleware(|state, cx| { + let recv = state.borrow_mut::>(); + let mut ref_loop = false; + while let Poll::Ready(Some(call)) = recv.poll_next_unpin(cx) { + call(); + ref_loop = true; // `call` can callback into runtime and schedule new callbacks :-) + } + ref_loop + }) + .state(move |state| { + let (tx, rx) = mpsc::unbounded::(); + state.put(tx); + state.put(rx); + + Ok(()) + }) + .build(); + + // Initialize a runtime instance + let mut js_runtime = JsRuntime::new(RuntimeOptions { + extensions: vec![my_ext], + ..Default::default() + }); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let future = async move { + // Schedule 10 tasks. + js_runtime + .execute_script( + "", + r#"for (let i = 1; i <= 10; i++) Deno.core.opSync("op_schedule_task", i);"# + ) + .unwrap(); + js_runtime.run_event_loop(false).await + }; + runtime.block_on(future).unwrap(); +} + +fn op_schedule_task(state: &mut OpState, i: u8, _: ()) -> Result<(), Error> { + let tx = state.borrow_mut::>(); + tx.unbounded_send(Box::new(move || println!("Hello, world! x{}", i))) + .expect("unbounded_send failed"); + Ok(()) +} diff --git a/core/extensions.rs b/core/extensions.rs index 587a2d0614..031cb073aa 100644 --- a/core/extensions.rs +++ b/core/extensions.rs @@ -1,12 +1,14 @@ use crate::OpFn; use crate::OpState; use anyhow::Error; +use std::task::Context; pub type SourcePair = (&'static str, Box); pub type SourceLoadFn = dyn Fn() -> Result; pub type OpPair = (&'static str, Box); pub type OpMiddlewareFn = dyn Fn(&'static str, Box) -> Box; pub type OpStateFn = dyn Fn(&mut OpState) -> Result<(), Error>; +pub type OpEventLoopFn = dyn Fn(&mut OpState, &mut Context) -> bool; #[derive(Default)] pub struct Extension { @@ -14,6 +16,7 @@ pub struct Extension { ops: Option>, opstate_fn: Option>, middleware_fn: Option>, + event_loop_middleware: Option>, initialized: bool, } @@ -56,6 +59,22 @@ impl Extension { pub fn init_middleware(&mut self) -> Option> { self.middleware_fn.take() } + + pub fn init_event_loop_middleware(&mut self) -> Option> { + self.event_loop_middleware.take() + } + + pub fn run_event_loop_middleware( + &self, + op_state: &mut OpState, + cx: &mut Context, + ) -> bool { + self + .event_loop_middleware + .as_ref() + .map(|f| f(op_state, cx)) + .unwrap_or(false) + } } // Provides a convenient builder pattern to declare Extensions @@ -65,6 +84,7 @@ pub struct ExtensionBuilder { ops: Vec, state: Option>, middleware: Option>, + event_loop_middleware: Option>, } impl ExtensionBuilder { @@ -94,6 +114,14 @@ impl ExtensionBuilder { self } + pub fn event_loop_middleware(&mut self, middleware_fn: F) -> &mut Self + where + F: Fn(&mut OpState, &mut Context) -> bool + 'static, + { + self.event_loop_middleware = Some(Box::new(middleware_fn)); + self + } + pub fn build(&mut self) -> Extension { let js_files = Some(std::mem::take(&mut self.js)); let ops = Some(std::mem::take(&mut self.ops)); @@ -102,6 +130,7 @@ impl ExtensionBuilder { ops, opstate_fn: self.state.take(), middleware_fn: self.middleware.take(), + event_loop_middleware: self.event_loop_middleware.take(), initialized: false, } } diff --git a/core/runtime.rs b/core/runtime.rs index b0ab92f467..42b66e13fa 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -5,6 +5,7 @@ use crate::error::attach_handle_to_error; use crate::error::generic_error; use crate::error::ErrWithV8Handle; use crate::error::JsError; +use crate::extensions::OpEventLoopFn; use crate::inspector::JsRuntimeInspector; use crate::module_specifier::ModuleSpecifier; use crate::modules::ModuleId; @@ -80,6 +81,7 @@ pub struct JsRuntime { has_snapshotted: bool, allocations: IsolateAllocations, extensions: Vec, + event_loop_middlewares: Vec>, } struct DynImportModEvaluate { @@ -381,6 +383,7 @@ impl JsRuntime { snapshot_creator: maybe_snapshot_creator, has_snapshotted: false, allocations: IsolateAllocations::default(), + event_loop_middlewares: Vec::with_capacity(options.extensions.len()), extensions: options.extensions, }; @@ -481,6 +484,10 @@ impl JsRuntime { for (name, opfn) in ops { self.register_op(name, macroware(name, opfn)); } + + if let Some(middleware) = e.init_event_loop_middleware() { + self.event_loop_middlewares.push(middleware); + } } // Restore extensions self.extensions = extensions; @@ -788,6 +795,18 @@ impl JsRuntime { self.check_promise_exceptions()?; } + // Event loop middlewares + let mut maybe_scheduling = false; + { + let state = state_rc.borrow(); + let op_state = state.op_state.clone(); + for f in &self.event_loop_middlewares { + if f(&mut op_state.borrow_mut(), cx) { + maybe_scheduling = true; + } + } + } + // Top level module self.evaluate_pending_module(); @@ -815,6 +834,7 @@ impl JsRuntime { && !has_pending_module_evaluation && !has_pending_background_tasks && !has_tick_scheduled + && !maybe_scheduling { if wait_for_inspector && inspector_has_active_sessions { return Poll::Pending; @@ -833,6 +853,7 @@ impl JsRuntime { if state.have_unpolled_ops || has_pending_background_tasks || has_tick_scheduled + || maybe_scheduling { state.waker.wake(); } @@ -843,6 +864,7 @@ impl JsRuntime { || has_pending_dyn_module_evaluation || has_pending_background_tasks || has_tick_scheduled + || maybe_scheduling { // pass, will be polled again } else {