1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-28 16:20:57 -05:00

Make timers act like normal ops

This is in preperation for core integration.
This commit is contained in:
Ryan Dahl 2019-03-10 15:37:05 -04:00
parent 9691d7b53b
commit 58cc69f672
8 changed files with 155 additions and 105 deletions

View file

@ -8,31 +8,20 @@ import * as util from "./util";
let nextCmdId = 0; let nextCmdId = 0;
const promiseTable = new Map<number, util.Resolvable<msg.Base>>(); const promiseTable = new Map<number, util.Resolvable<msg.Base>>();
let fireTimers: () => void;
export function setFireTimersCallback(fn: () => void): void {
fireTimers = fn;
}
export function handleAsyncMsgFromRust(ui8: Uint8Array): void { export function handleAsyncMsgFromRust(ui8: Uint8Array): void {
// If a the buffer is empty, recv() on the native side timed out and we util.assert(ui8 != null && ui8.length > 0);
// did not receive a message. const bb = new flatbuffers.ByteBuffer(ui8);
if (ui8 && ui8.length) { const base = msg.Base.getRootAsBase(bb);
const bb = new flatbuffers.ByteBuffer(ui8); const cmdId = base.cmdId();
const base = msg.Base.getRootAsBase(bb); const promise = promiseTable.get(cmdId);
const cmdId = base.cmdId(); util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
const promise = promiseTable.get(cmdId); promiseTable.delete(cmdId);
util.assert(promise != null, `Expecting promise in table. ${cmdId}`); const err = errors.maybeError(base);
promiseTable.delete(cmdId); if (err != null) {
const err = errors.maybeError(base); promise!.reject(err);
if (err != null) { } else {
promise!.reject(err); promise!.resolve(base);
} else {
promise!.resolve(base);
}
} }
// Fire timers that have become runnable.
fireTimers();
} }
function sendInternal( function sendInternal(

View file

@ -2,7 +2,7 @@
import { assert } from "./util"; import { assert } from "./util";
import * as msg from "gen/msg_generated"; import * as msg from "gen/msg_generated";
import * as flatbuffers from "./flatbuffers"; import * as flatbuffers from "./flatbuffers";
import { sendSync, setFireTimersCallback } from "./dispatch"; import { sendAsync, sendSync } from "./dispatch";
interface Timer { interface Timer {
id: number; id: number;
@ -37,28 +37,39 @@ function getTime(): number {
return now; return now;
} }
function setGlobalTimeout(due: number | null, now: number): void { function clearGlobalTimeout(): void {
const builder = flatbuffers.createBuilder();
msg.GlobalTimerStop.startGlobalTimerStop(builder);
const inner = msg.GlobalTimerStop.endGlobalTimerStop(builder);
globalTimeoutDue = null;
let res = sendSync(builder, msg.Any.GlobalTimerStop, inner);
assert(res == null);
}
async function setGlobalTimeout(due: number, now: number): Promise<void> {
// Since JS and Rust don't use the same clock, pass the time to rust as a // Since JS and Rust don't use the same clock, pass the time to rust as a
// relative time value. On the Rust side we'll turn that into an absolute // relative time value. On the Rust side we'll turn that into an absolute
// value again. // value again.
// Note that a negative time-out value stops the global timer. let timeout = due - now;
let timeout; assert(timeout >= 0);
if (due === null) {
timeout = -1;
} else {
timeout = due - now;
assert(timeout >= 0);
}
// Send message to the backend. // Send message to the backend.
const builder = flatbuffers.createBuilder(); const builder = flatbuffers.createBuilder();
msg.SetTimeout.startSetTimeout(builder); msg.GlobalTimer.startGlobalTimer(builder);
msg.SetTimeout.addTimeout(builder, timeout); msg.GlobalTimer.addTimeout(builder, timeout);
const inner = msg.SetTimeout.endSetTimeout(builder); const inner = msg.GlobalTimer.endGlobalTimer(builder);
const res = sendSync(builder, msg.Any.SetTimeout, inner);
assert(res == null);
// Remember when when the global timer will fire.
globalTimeoutDue = due; globalTimeoutDue = due;
await sendAsync(builder, msg.Any.GlobalTimer, inner);
// eslint-disable-next-line @typescript-eslint/no-use-before-define
fireTimers();
}
function setOrClearGlobalTimeout(due: number | null, now: number): void {
if (due == null) {
clearGlobalTimeout();
} else {
setGlobalTimeout(due, now);
}
} }
function schedule(timer: Timer, now: number): void { function schedule(timer: Timer, now: number): void {
@ -75,7 +86,7 @@ function schedule(timer: Timer, now: number): void {
// If the new timer is scheduled to fire before any timer that existed before, // If the new timer is scheduled to fire before any timer that existed before,
// update the global timeout to reflect this. // update the global timeout to reflect this.
if (globalTimeoutDue === null || globalTimeoutDue > timer.due) { if (globalTimeoutDue === null || globalTimeoutDue > timer.due) {
setGlobalTimeout(timer.due, now); setOrClearGlobalTimeout(timer.due, now);
} }
} }
@ -97,7 +108,7 @@ function unschedule(timer: Timer): void {
nextTimerDue = Number(key); nextTimerDue = Number(key);
break; break;
} }
setGlobalTimeout(nextTimerDue, getTime()); setOrClearGlobalTimeout(nextTimerDue, getTime());
} }
} else { } else {
// Multiple timers that are due at the same point in time. // Multiple timers that are due at the same point in time.
@ -162,9 +173,10 @@ function fireTimers(): void {
Promise.resolve(timer).then(fire); Promise.resolve(timer).then(fire);
} }
} }
// Update the global alarm to go off when the first-up timer that hasn't fired // Update the global alarm to go off when the first-up timer that hasn't fired
// yet is due. // yet is due.
setGlobalTimeout(nextTimerDue, now); setOrClearGlobalTimeout(nextTimerDue, now);
} }
export type Args = unknown[]; export type Args = unknown[];
@ -226,7 +238,7 @@ export function setInterval(
return setTimer(cb, delay, args, true); return setTimer(cb, delay, args, true);
} }
/** Clears a previously set timer by id. */ /** Clears a previously set timer by id. AKA clearTimeout and clearInterval. */
export function clearTimer(id: number): void { export function clearTimer(id: number): void {
const timer = idMap.get(id); const timer = idMap.get(id);
if (timer === undefined) { if (timer === undefined) {
@ -237,7 +249,3 @@ export function clearTimer(id: number): void {
unschedule(timer); unschedule(timer);
idMap.delete(timer.id); idMap.delete(timer.id);
} }
// Tell the dispatcher which function it should call to fire timers that are
// due. This is done using a callback because circular imports are disallowed.
setFireTimersCallback(fireTimers);

49
src/global_timer.rs Normal file
View file

@ -0,0 +1,49 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
//! This module helps deno implement timers.
//!
//! As an optimization, we want to avoid an expensive calls into rust for every
//! setTimeout in JavaScript. Thus in //js/timers.ts a data structure is
//! implemented that calls into Rust for only the smallest timeout. Thus we
//! only need to be able to start and cancel a single timer (or Delay, as Tokio
//! calls it) for an entire Isolate. This is what is implemented here.
use crate::tokio_util::panic_on_error;
use futures::Future;
use std::time::Instant;
use tokio::sync::oneshot;
use tokio::timer::Delay;
pub struct GlobalTimer {
tx: Option<oneshot::Sender<()>>,
}
impl GlobalTimer {
pub fn new() -> Self {
Self { tx: None }
}
pub fn cancel(&mut self) {
if let Some(tx) = self.tx.take() {
tx.send(()).ok();
}
}
pub fn new_timeout(
&mut self,
deadline: Instant,
) -> impl Future<Item = (), Error = ()> {
if self.tx.is_some() {
self.cancel();
}
assert!(self.tx.is_none());
let (tx, rx) = oneshot::channel();
self.tx = Some(tx);
let delay = panic_on_error(Delay::new(deadline));
let rx = panic_on_error(rx);
delay.select(rx).then(|_| Ok(()))
}
}

View file

@ -12,6 +12,7 @@ use crate::errors::DenoError;
use crate::errors::DenoResult; use crate::errors::DenoResult;
use crate::errors::RustOrJsError; use crate::errors::RustOrJsError;
use crate::flags; use crate::flags;
use crate::global_timer::GlobalTimer;
use crate::isolate_init::IsolateInit; use crate::isolate_init::IsolateInit;
use crate::js_errors::apply_source_map; use crate::js_errors::apply_source_map;
use crate::libdeno; use crate::libdeno;
@ -35,8 +36,6 @@ use std::sync::mpsc;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex; use std::sync::Mutex;
use std::sync::{Once, ONCE_INIT}; use std::sync::{Once, ONCE_INIT};
use std::time::Duration;
use std::time::Instant;
use tokio; use tokio;
// Buf represents a byte array returned from a "Op". // Buf represents a byte array returned from a "Op".
@ -62,7 +61,6 @@ pub struct Isolate {
rx: mpsc::Receiver<(usize, Buf)>, rx: mpsc::Receiver<(usize, Buf)>,
tx: mpsc::Sender<(usize, Buf)>, tx: mpsc::Sender<(usize, Buf)>,
ntasks: Cell<i32>, ntasks: Cell<i32>,
timeout_due: Cell<Option<Instant>>,
pub modules: RefCell<Modules>, pub modules: RefCell<Modules>,
pub state: Arc<IsolateState>, pub state: Arc<IsolateState>,
pub permissions: Arc<DenoPermissions>, pub permissions: Arc<DenoPermissions>,
@ -83,6 +81,7 @@ pub struct IsolateState {
pub flags: flags::DenoFlags, pub flags: flags::DenoFlags,
pub metrics: Metrics, pub metrics: Metrics,
pub worker_channels: Option<Mutex<WorkerChannels>>, pub worker_channels: Option<Mutex<WorkerChannels>>,
pub global_timer: Mutex<GlobalTimer>,
} }
impl IsolateState { impl IsolateState {
@ -100,6 +99,7 @@ impl IsolateState {
flags, flags,
metrics: Metrics::default(), metrics: Metrics::default(),
worker_channels: worker_channels.map(Mutex::new), worker_channels: worker_channels.map(Mutex::new),
global_timer: Mutex::new(GlobalTimer::new()),
} }
} }
@ -194,7 +194,6 @@ impl Isolate {
rx, rx,
tx, tx,
ntasks: Cell::new(0), ntasks: Cell::new(0),
timeout_due: Cell::new(None),
modules: RefCell::new(Modules::new()), modules: RefCell::new(Modules::new()),
state, state,
permissions: Arc::new(permissions), permissions: Arc::new(permissions),
@ -222,16 +221,6 @@ impl Isolate {
&*ptr &*ptr
} }
#[inline]
pub fn get_timeout_due(&self) -> Option<Instant> {
self.timeout_due.clone().into_inner()
}
#[inline]
pub fn set_timeout_due(&self, inst: Option<Instant>) {
self.timeout_due.set(inst);
}
#[inline] #[inline]
pub fn check_read(&self, filename: &str) -> DenoResult<()> { pub fn check_read(&self, filename: &str) -> DenoResult<()> {
self.permissions.check_read(filename) self.permissions.check_read(filename)
@ -463,10 +452,9 @@ impl Isolate {
pub fn event_loop(&self) -> Result<(), JSError> { pub fn event_loop(&self) -> Result<(), JSError> {
// Main thread event loop. // Main thread event loop.
while !self.is_idle() { while !self.is_idle() {
match recv_deadline(&self.rx, self.get_timeout_due()) { match self.rx.recv() {
Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf), Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf),
Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(), Err(e) => panic!("Isolate.rx.recv() failed: {:?}", e),
Err(e) => panic!("recv_deadline() failed: {:?}", e),
} }
self.check_promise_errors(); self.check_promise_errors();
if let Some(err) = self.last_exception() { if let Some(err) = self.last_exception() {
@ -495,7 +483,7 @@ impl Isolate {
#[inline] #[inline]
fn is_idle(&self) -> bool { fn is_idle(&self) -> bool {
self.ntasks.get() == 0 && self.get_timeout_due().is_none() self.ntasks.get() == 0
} }
} }
@ -596,28 +584,6 @@ extern "C" fn pre_dispatch(
} }
} }
fn recv_deadline<T>(
rx: &mpsc::Receiver<T>,
maybe_due: Option<Instant>,
) -> Result<T, mpsc::RecvTimeoutError> {
match maybe_due {
None => rx.recv().map_err(|e| e.into()),
Some(due) => {
// Subtracting two Instants causes a panic if the resulting duration
// would become negative. Avoid this.
let now = Instant::now();
let timeout = if due > now {
due - now
} else {
Duration::new(0, 0)
};
// TODO: use recv_deadline() instead of recv_timeout() when this
// feature becomes stable/available.
rx.recv_timeout(timeout)
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View file

@ -14,6 +14,7 @@ pub mod deno_dir;
pub mod errors; pub mod errors;
pub mod flags; pub mod flags;
mod fs; mod fs;
mod global_timer;
mod http_body; mod http_body;
mod http_util; mod http_util;
pub mod isolate; pub mod isolate;

View file

@ -16,6 +16,9 @@ union Any {
FetchRes, FetchRes,
FormatError, FormatError,
FormatErrorRes, FormatErrorRes,
GlobalTimer,
GlobalTimerRes,
GlobalTimerStop,
IsTTY, IsTTY,
IsTTYRes, IsTTYRes,
Listen, Listen,
@ -55,7 +58,6 @@ union Any {
RunStatusRes, RunStatusRes,
Seek, Seek,
SetEnv, SetEnv,
SetTimeout,
Shutdown, Shutdown,
Start, Start,
StartRes, StartRes,
@ -210,10 +212,14 @@ table Chdir {
directory: string; directory: string;
} }
table SetTimeout { table GlobalTimer {
timeout: int; timeout: int;
} }
table GlobalTimerRes { }
table GlobalTimerStop { }
table Exit { table Exit {
code: int; code: int;
} }

View file

@ -80,13 +80,7 @@ pub fn dispatch(
let inner_type = base.inner_type(); let inner_type = base.inner_type();
let cmd_id = base.cmd_id(); let cmd_id = base.cmd_id();
let op: Box<Op> = if inner_type == msg::Any::SetTimeout { let op: Box<Op> = {
// SetTimeout is an exceptional op: the global timeout field is part of the
// Isolate state (not the IsolateState state) and it must be updated on the
// main thread.
assert_eq!(is_sync, true);
op_set_timeout(isolate, &base, data)
} else {
// Handle regular ops. // Handle regular ops.
let op_creator: OpCreator = match inner_type { let op_creator: OpCreator = match inner_type {
msg::Any::Accept => op_accept, msg::Any::Accept => op_accept,
@ -101,6 +95,8 @@ pub fn dispatch(
msg::Any::Fetch => op_fetch, msg::Any::Fetch => op_fetch,
msg::Any::FetchModuleMetaData => op_fetch_module_meta_data, msg::Any::FetchModuleMetaData => op_fetch_module_meta_data,
msg::Any::FormatError => op_format_error, msg::Any::FormatError => op_format_error,
msg::Any::GlobalTimer => op_global_timer,
msg::Any::GlobalTimerStop => op_global_timer_stop,
msg::Any::IsTTY => op_is_tty, msg::Any::IsTTY => op_is_tty,
msg::Any::Listen => op_listen, msg::Any::Listen => op_listen,
msg::Any::MakeTempDir => op_make_temp_dir, msg::Any::MakeTempDir => op_make_temp_dir,
@ -440,23 +436,50 @@ fn op_chdir(
}())) }()))
} }
fn op_set_timeout( fn op_global_timer_stop(
isolate: &Isolate, isolate: &Isolate,
base: &msg::Base<'_>, base: &msg::Base<'_>,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
assert!(base.sync());
assert_eq!(data.len(), 0); assert_eq!(data.len(), 0);
let inner = base.inner_as_set_timeout().unwrap(); let mut t = isolate.state.global_timer.lock().unwrap();
let val = inner.timeout(); t.cancel();
let timeout_due = if val >= 0 {
Some(Instant::now() + Duration::from_millis(val as u64))
} else {
None
};
isolate.set_timeout_due(timeout_due);
ok_future(empty_buf()) ok_future(empty_buf())
} }
fn op_global_timer(
isolate: &Isolate,
base: &msg::Base<'_>,
data: libdeno::deno_buf,
) -> Box<Op> {
assert!(!base.sync());
assert_eq!(data.len(), 0);
let cmd_id = base.cmd_id();
let inner = base.inner_as_global_timer().unwrap();
let val = inner.timeout();
assert!(val >= 0);
let mut t = isolate.state.global_timer.lock().unwrap();
let deadline = Instant::now() + Duration::from_millis(val as u64);
let f = t.new_timeout(deadline);
Box::new(f.then(move |_| {
let builder = &mut FlatBufferBuilder::new();
let inner =
msg::GlobalTimerRes::create(builder, &msg::GlobalTimerResArgs {});
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
inner_type: msg::Any::GlobalTimerRes,
..Default::default()
},
))
}))
}
fn op_set_env( fn op_set_env(
isolate: &Isolate, isolate: &Isolate,
base: &msg::Base<'_>, base: &msg::Base<'_>,

View file

@ -122,3 +122,11 @@ where
f() f()
} }
} }
pub fn panic_on_error<I, E, F>(f: F) -> impl Future<Item = I, Error = ()>
where
F: Future<Item = I, Error = E>,
E: std::fmt::Debug,
{
f.map_err(|err| panic!("Future got unexpected error: {:?}", err))
}