diff --git a/js/dispatch.ts b/js/dispatch.ts index 5372918777..9dcd2f4209 100644 --- a/js/dispatch.ts +++ b/js/dispatch.ts @@ -8,31 +8,20 @@ import * as util from "./util"; let nextCmdId = 0; const promiseTable = new Map>(); -let fireTimers: () => void; - -export function setFireTimersCallback(fn: () => void): void { - fireTimers = fn; -} - export function handleAsyncMsgFromRust(ui8: Uint8Array): void { - // If a the buffer is empty, recv() on the native side timed out and we - // did not receive a message. - if (ui8 && ui8.length) { - const bb = new flatbuffers.ByteBuffer(ui8); - const base = msg.Base.getRootAsBase(bb); - const cmdId = base.cmdId(); - const promise = promiseTable.get(cmdId); - util.assert(promise != null, `Expecting promise in table. ${cmdId}`); - promiseTable.delete(cmdId); - const err = errors.maybeError(base); - if (err != null) { - promise!.reject(err); - } else { - promise!.resolve(base); - } + util.assert(ui8 != null && ui8.length > 0); + const bb = new flatbuffers.ByteBuffer(ui8); + const base = msg.Base.getRootAsBase(bb); + const cmdId = base.cmdId(); + const promise = promiseTable.get(cmdId); + util.assert(promise != null, `Expecting promise in table. ${cmdId}`); + promiseTable.delete(cmdId); + const err = errors.maybeError(base); + if (err != null) { + promise!.reject(err); + } else { + promise!.resolve(base); } - // Fire timers that have become runnable. - fireTimers(); } function sendInternal( diff --git a/js/timers.ts b/js/timers.ts index d06056cf2a..4089d5f8b0 100644 --- a/js/timers.ts +++ b/js/timers.ts @@ -2,7 +2,7 @@ import { assert } from "./util"; import * as msg from "gen/msg_generated"; import * as flatbuffers from "./flatbuffers"; -import { sendSync, setFireTimersCallback } from "./dispatch"; +import { sendAsync, sendSync } from "./dispatch"; interface Timer { id: number; @@ -37,28 +37,39 @@ function getTime(): number { return now; } -function setGlobalTimeout(due: number | null, now: number): void { +function clearGlobalTimeout(): void { + const builder = flatbuffers.createBuilder(); + msg.GlobalTimerStop.startGlobalTimerStop(builder); + const inner = msg.GlobalTimerStop.endGlobalTimerStop(builder); + globalTimeoutDue = null; + let res = sendSync(builder, msg.Any.GlobalTimerStop, inner); + assert(res == null); +} + +async function setGlobalTimeout(due: number, now: number): Promise { // Since JS and Rust don't use the same clock, pass the time to rust as a // relative time value. On the Rust side we'll turn that into an absolute // value again. - // Note that a negative time-out value stops the global timer. - let timeout; - if (due === null) { - timeout = -1; - } else { - timeout = due - now; - assert(timeout >= 0); - } + let timeout = due - now; + assert(timeout >= 0); // Send message to the backend. const builder = flatbuffers.createBuilder(); - msg.SetTimeout.startSetTimeout(builder); - msg.SetTimeout.addTimeout(builder, timeout); - const inner = msg.SetTimeout.endSetTimeout(builder); - const res = sendSync(builder, msg.Any.SetTimeout, inner); - assert(res == null); - // Remember when when the global timer will fire. + msg.GlobalTimer.startGlobalTimer(builder); + msg.GlobalTimer.addTimeout(builder, timeout); + const inner = msg.GlobalTimer.endGlobalTimer(builder); globalTimeoutDue = due; + await sendAsync(builder, msg.Any.GlobalTimer, inner); + // eslint-disable-next-line @typescript-eslint/no-use-before-define + fireTimers(); +} + +function setOrClearGlobalTimeout(due: number | null, now: number): void { + if (due == null) { + clearGlobalTimeout(); + } else { + setGlobalTimeout(due, now); + } } function schedule(timer: Timer, now: number): void { @@ -75,7 +86,7 @@ function schedule(timer: Timer, now: number): void { // If the new timer is scheduled to fire before any timer that existed before, // update the global timeout to reflect this. if (globalTimeoutDue === null || globalTimeoutDue > timer.due) { - setGlobalTimeout(timer.due, now); + setOrClearGlobalTimeout(timer.due, now); } } @@ -97,7 +108,7 @@ function unschedule(timer: Timer): void { nextTimerDue = Number(key); break; } - setGlobalTimeout(nextTimerDue, getTime()); + setOrClearGlobalTimeout(nextTimerDue, getTime()); } } else { // Multiple timers that are due at the same point in time. @@ -162,9 +173,10 @@ function fireTimers(): void { Promise.resolve(timer).then(fire); } } + // Update the global alarm to go off when the first-up timer that hasn't fired // yet is due. - setGlobalTimeout(nextTimerDue, now); + setOrClearGlobalTimeout(nextTimerDue, now); } export type Args = unknown[]; @@ -226,7 +238,7 @@ export function setInterval( return setTimer(cb, delay, args, true); } -/** Clears a previously set timer by id. */ +/** Clears a previously set timer by id. AKA clearTimeout and clearInterval. */ export function clearTimer(id: number): void { const timer = idMap.get(id); if (timer === undefined) { @@ -237,7 +249,3 @@ export function clearTimer(id: number): void { unschedule(timer); idMap.delete(timer.id); } - -// Tell the dispatcher which function it should call to fire timers that are -// due. This is done using a callback because circular imports are disallowed. -setFireTimersCallback(fireTimers); diff --git a/src/global_timer.rs b/src/global_timer.rs new file mode 100644 index 0000000000..eef70ddc20 --- /dev/null +++ b/src/global_timer.rs @@ -0,0 +1,49 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. + +//! This module helps deno implement timers. +//! +//! As an optimization, we want to avoid an expensive calls into rust for every +//! setTimeout in JavaScript. Thus in //js/timers.ts a data structure is +//! implemented that calls into Rust for only the smallest timeout. Thus we +//! only need to be able to start and cancel a single timer (or Delay, as Tokio +//! calls it) for an entire Isolate. This is what is implemented here. + +use crate::tokio_util::panic_on_error; +use futures::Future; +use std::time::Instant; +use tokio::sync::oneshot; +use tokio::timer::Delay; + +pub struct GlobalTimer { + tx: Option>, +} + +impl GlobalTimer { + pub fn new() -> Self { + Self { tx: None } + } + + pub fn cancel(&mut self) { + if let Some(tx) = self.tx.take() { + tx.send(()).ok(); + } + } + + pub fn new_timeout( + &mut self, + deadline: Instant, + ) -> impl Future { + if self.tx.is_some() { + self.cancel(); + } + assert!(self.tx.is_none()); + + let (tx, rx) = oneshot::channel(); + self.tx = Some(tx); + + let delay = panic_on_error(Delay::new(deadline)); + let rx = panic_on_error(rx); + + delay.select(rx).then(|_| Ok(())) + } +} diff --git a/src/isolate.rs b/src/isolate.rs index d4f0f25399..8a77777d2b 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -12,6 +12,7 @@ use crate::errors::DenoError; use crate::errors::DenoResult; use crate::errors::RustOrJsError; use crate::flags; +use crate::global_timer::GlobalTimer; use crate::isolate_init::IsolateInit; use crate::js_errors::apply_source_map; use crate::libdeno; @@ -35,8 +36,6 @@ use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; use std::sync::{Once, ONCE_INIT}; -use std::time::Duration; -use std::time::Instant; use tokio; // Buf represents a byte array returned from a "Op". @@ -62,7 +61,6 @@ pub struct Isolate { rx: mpsc::Receiver<(usize, Buf)>, tx: mpsc::Sender<(usize, Buf)>, ntasks: Cell, - timeout_due: Cell>, pub modules: RefCell, pub state: Arc, pub permissions: Arc, @@ -83,6 +81,7 @@ pub struct IsolateState { pub flags: flags::DenoFlags, pub metrics: Metrics, pub worker_channels: Option>, + pub global_timer: Mutex, } impl IsolateState { @@ -100,6 +99,7 @@ impl IsolateState { flags, metrics: Metrics::default(), worker_channels: worker_channels.map(Mutex::new), + global_timer: Mutex::new(GlobalTimer::new()), } } @@ -194,7 +194,6 @@ impl Isolate { rx, tx, ntasks: Cell::new(0), - timeout_due: Cell::new(None), modules: RefCell::new(Modules::new()), state, permissions: Arc::new(permissions), @@ -222,16 +221,6 @@ impl Isolate { &*ptr } - #[inline] - pub fn get_timeout_due(&self) -> Option { - self.timeout_due.clone().into_inner() - } - - #[inline] - pub fn set_timeout_due(&self, inst: Option) { - self.timeout_due.set(inst); - } - #[inline] pub fn check_read(&self, filename: &str) -> DenoResult<()> { self.permissions.check_read(filename) @@ -463,10 +452,9 @@ impl Isolate { pub fn event_loop(&self) -> Result<(), JSError> { // Main thread event loop. while !self.is_idle() { - match recv_deadline(&self.rx, self.get_timeout_due()) { + match self.rx.recv() { Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf), - Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(), - Err(e) => panic!("recv_deadline() failed: {:?}", e), + Err(e) => panic!("Isolate.rx.recv() failed: {:?}", e), } self.check_promise_errors(); if let Some(err) = self.last_exception() { @@ -495,7 +483,7 @@ impl Isolate { #[inline] fn is_idle(&self) -> bool { - self.ntasks.get() == 0 && self.get_timeout_due().is_none() + self.ntasks.get() == 0 } } @@ -596,28 +584,6 @@ extern "C" fn pre_dispatch( } } -fn recv_deadline( - rx: &mpsc::Receiver, - maybe_due: Option, -) -> Result { - match maybe_due { - None => rx.recv().map_err(|e| e.into()), - Some(due) => { - // Subtracting two Instants causes a panic if the resulting duration - // would become negative. Avoid this. - let now = Instant::now(); - let timeout = if due > now { - due - now - } else { - Duration::new(0, 0) - }; - // TODO: use recv_deadline() instead of recv_timeout() when this - // feature becomes stable/available. - rx.recv_timeout(timeout) - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/main.rs b/src/main.rs index 48d96b04ac..7bafe5c3ca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ pub mod deno_dir; pub mod errors; pub mod flags; mod fs; +mod global_timer; mod http_body; mod http_util; pub mod isolate; diff --git a/src/msg.fbs b/src/msg.fbs index 4d54b185e4..279264a454 100644 --- a/src/msg.fbs +++ b/src/msg.fbs @@ -16,6 +16,9 @@ union Any { FetchRes, FormatError, FormatErrorRes, + GlobalTimer, + GlobalTimerRes, + GlobalTimerStop, IsTTY, IsTTYRes, Listen, @@ -55,7 +58,6 @@ union Any { RunStatusRes, Seek, SetEnv, - SetTimeout, Shutdown, Start, StartRes, @@ -210,10 +212,14 @@ table Chdir { directory: string; } -table SetTimeout { +table GlobalTimer { timeout: int; } +table GlobalTimerRes { } + +table GlobalTimerStop { } + table Exit { code: int; } diff --git a/src/ops.rs b/src/ops.rs index f4ee2434e3..17cb008dbd 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -80,13 +80,7 @@ pub fn dispatch( let inner_type = base.inner_type(); let cmd_id = base.cmd_id(); - let op: Box = if inner_type == msg::Any::SetTimeout { - // SetTimeout is an exceptional op: the global timeout field is part of the - // Isolate state (not the IsolateState state) and it must be updated on the - // main thread. - assert_eq!(is_sync, true); - op_set_timeout(isolate, &base, data) - } else { + let op: Box = { // Handle regular ops. let op_creator: OpCreator = match inner_type { msg::Any::Accept => op_accept, @@ -101,6 +95,8 @@ pub fn dispatch( msg::Any::Fetch => op_fetch, msg::Any::FetchModuleMetaData => op_fetch_module_meta_data, msg::Any::FormatError => op_format_error, + msg::Any::GlobalTimer => op_global_timer, + msg::Any::GlobalTimerStop => op_global_timer_stop, msg::Any::IsTTY => op_is_tty, msg::Any::Listen => op_listen, msg::Any::MakeTempDir => op_make_temp_dir, @@ -440,23 +436,50 @@ fn op_chdir( }())) } -fn op_set_timeout( +fn op_global_timer_stop( isolate: &Isolate, base: &msg::Base<'_>, data: libdeno::deno_buf, ) -> Box { + assert!(base.sync()); assert_eq!(data.len(), 0); - let inner = base.inner_as_set_timeout().unwrap(); - let val = inner.timeout(); - let timeout_due = if val >= 0 { - Some(Instant::now() + Duration::from_millis(val as u64)) - } else { - None - }; - isolate.set_timeout_due(timeout_due); + let mut t = isolate.state.global_timer.lock().unwrap(); + t.cancel(); ok_future(empty_buf()) } +fn op_global_timer( + isolate: &Isolate, + base: &msg::Base<'_>, + data: libdeno::deno_buf, +) -> Box { + assert!(!base.sync()); + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_global_timer().unwrap(); + let val = inner.timeout(); + assert!(val >= 0); + + let mut t = isolate.state.global_timer.lock().unwrap(); + let deadline = Instant::now() + Duration::from_millis(val as u64); + let f = t.new_timeout(deadline); + + Box::new(f.then(move |_| { + let builder = &mut FlatBufferBuilder::new(); + let inner = + msg::GlobalTimerRes::create(builder, &msg::GlobalTimerResArgs {}); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::GlobalTimerRes, + ..Default::default() + }, + )) + })) +} + fn op_set_env( isolate: &Isolate, base: &msg::Base<'_>, diff --git a/src/tokio_util.rs b/src/tokio_util.rs index 3dddff9c2f..ef66f46107 100644 --- a/src/tokio_util.rs +++ b/src/tokio_util.rs @@ -122,3 +122,11 @@ where f() } } + +pub fn panic_on_error(f: F) -> impl Future +where + F: Future, + E: std::fmt::Debug, +{ + f.map_err(|err| panic!("Future got unexpected error: {:?}", err)) +}