mirror of
https://github.com/denoland/deno.git
synced 2024-12-01 16:51:13 -05:00
refactor: move TaskQueue from deno_runtime to deno_core (#18016)
This utility is useful in several contexts so it seems reasonable to have it in `deno_core`.
This commit is contained in:
parent
073dcf2ea5
commit
3494c78679
4 changed files with 151 additions and 149 deletions
|
@ -20,6 +20,7 @@ mod resources;
|
||||||
mod runtime;
|
mod runtime;
|
||||||
pub mod snapshot_util;
|
pub mod snapshot_util;
|
||||||
mod source_map;
|
mod source_map;
|
||||||
|
mod task_queue;
|
||||||
|
|
||||||
// Re-exports
|
// Re-exports
|
||||||
pub use anyhow;
|
pub use anyhow;
|
||||||
|
@ -116,6 +117,7 @@ pub use crate::runtime::Snapshot;
|
||||||
pub use crate::runtime::V8_WRAPPER_OBJECT_INDEX;
|
pub use crate::runtime::V8_WRAPPER_OBJECT_INDEX;
|
||||||
pub use crate::runtime::V8_WRAPPER_TYPE_INDEX;
|
pub use crate::runtime::V8_WRAPPER_TYPE_INDEX;
|
||||||
pub use crate::source_map::SourceMapGetter;
|
pub use crate::source_map::SourceMapGetter;
|
||||||
|
pub use crate::task_queue::TaskQueue;
|
||||||
|
|
||||||
pub fn v8_version() -> &'static str {
|
pub fn v8_version() -> &'static str {
|
||||||
v8::V8::get_version()
|
v8::V8::get_version()
|
||||||
|
|
148
core/task_queue.rs
Normal file
148
core/task_queue.rs
Normal file
|
@ -0,0 +1,148 @@
|
||||||
|
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
use futures::task::AtomicWaker;
|
||||||
|
use futures::Future;
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use std::collections::LinkedList;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct TaskQueueTaskWaker {
|
||||||
|
is_ready: AtomicBool,
|
||||||
|
waker: AtomicWaker,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct TaskQueueTasks {
|
||||||
|
is_running: bool,
|
||||||
|
wakers: LinkedList<Arc<TaskQueueTaskWaker>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A queue that executes tasks sequentially one after the other
|
||||||
|
/// ensuring order and that no task runs at the same time as another.
|
||||||
|
///
|
||||||
|
/// Note that tokio's semaphore doesn't seem to maintain order
|
||||||
|
/// and so we can't use that in the code that uses this or use
|
||||||
|
/// that here.
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
pub struct TaskQueue {
|
||||||
|
tasks: Arc<Mutex<TaskQueueTasks>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TaskQueue {
|
||||||
|
/// Alternate API that acquires a permit internally
|
||||||
|
/// for the duration of the future.
|
||||||
|
#[cfg(test)]
|
||||||
|
pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R {
|
||||||
|
let _permit = self.acquire().await;
|
||||||
|
future.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Acquires a permit where the tasks are executed one at a time
|
||||||
|
/// and in the order that they were acquired.
|
||||||
|
pub async fn acquire(&self) -> TaskQueuePermit {
|
||||||
|
let acquire = TaskQueuePermitAcquire::new(self.tasks.clone());
|
||||||
|
acquire.await;
|
||||||
|
TaskQueuePermit {
|
||||||
|
tasks: self.tasks.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A permit that when dropped will allow another task to proceed.
|
||||||
|
pub struct TaskQueuePermit {
|
||||||
|
tasks: Arc<Mutex<TaskQueueTasks>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for TaskQueuePermit {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let next_item = {
|
||||||
|
let mut tasks = self.tasks.lock();
|
||||||
|
let next_item = tasks.wakers.pop_front();
|
||||||
|
tasks.is_running = next_item.is_some();
|
||||||
|
next_item
|
||||||
|
};
|
||||||
|
if let Some(next_item) = next_item {
|
||||||
|
next_item.is_ready.store(true, Ordering::SeqCst);
|
||||||
|
next_item.waker.wake();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TaskQueuePermitAcquire {
|
||||||
|
tasks: Arc<Mutex<TaskQueueTasks>>,
|
||||||
|
initialized: AtomicBool,
|
||||||
|
waker: Arc<TaskQueueTaskWaker>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TaskQueuePermitAcquire {
|
||||||
|
pub fn new(tasks: Arc<Mutex<TaskQueueTasks>>) -> Self {
|
||||||
|
Self {
|
||||||
|
tasks,
|
||||||
|
initialized: Default::default(),
|
||||||
|
waker: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for TaskQueuePermitAcquire {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Self::Output> {
|
||||||
|
// update with the latest waker
|
||||||
|
self.waker.waker.register(cx.waker());
|
||||||
|
|
||||||
|
// ensure this is initialized
|
||||||
|
if !self.initialized.swap(true, Ordering::SeqCst) {
|
||||||
|
let mut tasks = self.tasks.lock();
|
||||||
|
if !tasks.is_running {
|
||||||
|
tasks.is_running = true;
|
||||||
|
return std::task::Poll::Ready(());
|
||||||
|
}
|
||||||
|
tasks.wakers.push_back(self.waker.clone());
|
||||||
|
return std::task::Poll::Pending;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if we're ready to run
|
||||||
|
if self.waker.is_ready.load(Ordering::SeqCst) {
|
||||||
|
std::task::Poll::Ready(())
|
||||||
|
} else {
|
||||||
|
std::task::Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use super::TaskQueue;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn task_queue_runs_one_after_other() {
|
||||||
|
let task_queue = TaskQueue::default();
|
||||||
|
let mut tasks = Vec::new();
|
||||||
|
let data = Arc::new(Mutex::new(0));
|
||||||
|
for i in 0..100 {
|
||||||
|
let data = data.clone();
|
||||||
|
tasks.push(task_queue.queue(async move {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let mut data = data.lock();
|
||||||
|
if *data != i {
|
||||||
|
panic!("Value was not equal.");
|
||||||
|
}
|
||||||
|
*data = i + 1;
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
futures::future::join_all(tasks).await;
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,6 +16,7 @@ use deno_core::OpState;
|
||||||
use deno_core::RcRef;
|
use deno_core::RcRef;
|
||||||
use deno_core::Resource;
|
use deno_core::Resource;
|
||||||
use deno_core::ResourceId;
|
use deno_core::ResourceId;
|
||||||
|
use deno_core::TaskQueue;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
|
@ -170,8 +171,6 @@ pub fn init_stdio(stdio: Stdio) -> Extension {
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
use nix::sys::termios;
|
use nix::sys::termios;
|
||||||
|
|
||||||
use super::utils::TaskQueue;
|
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct TtyMetadata {
|
pub struct TtyMetadata {
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
|
|
|
@ -2,13 +2,6 @@
|
||||||
|
|
||||||
use deno_core::error::custom_error;
|
use deno_core::error::custom_error;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::futures::task::AtomicWaker;
|
|
||||||
use deno_core::futures::Future;
|
|
||||||
use deno_core::parking_lot::Mutex;
|
|
||||||
use std::collections::LinkedList;
|
|
||||||
use std::sync::atomic::AtomicBool;
|
|
||||||
use std::sync::atomic::Ordering;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
/// A utility function to map OsStrings to Strings
|
/// A utility function to map OsStrings to Strings
|
||||||
pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
|
pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
|
||||||
|
@ -17,143 +10,3 @@ pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
|
||||||
custom_error("InvalidData", message)
|
custom_error("InvalidData", message)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct TaskQueueTaskWaker {
|
|
||||||
is_ready: AtomicBool,
|
|
||||||
waker: AtomicWaker,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct TaskQueueTasks {
|
|
||||||
is_running: bool,
|
|
||||||
wakers: LinkedList<Arc<TaskQueueTaskWaker>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A queue that executes tasks sequentially one after the other
|
|
||||||
/// ensuring order and that no task runs at the same time as another.
|
|
||||||
///
|
|
||||||
/// Note that tokio's semaphore doesn't seem to maintain order
|
|
||||||
/// and so we can't use that in the code that uses this or use
|
|
||||||
/// that here.
|
|
||||||
#[derive(Clone, Default)]
|
|
||||||
pub struct TaskQueue {
|
|
||||||
tasks: Arc<Mutex<TaskQueueTasks>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TaskQueue {
|
|
||||||
/// Alternate API that acquires a permit internally
|
|
||||||
/// for the duration of the future.
|
|
||||||
#[cfg(test)]
|
|
||||||
pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R {
|
|
||||||
let _permit = self.acquire().await;
|
|
||||||
future.await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Acquires a permit where the tasks are executed one at a time
|
|
||||||
/// and in the order that they were acquired.
|
|
||||||
pub async fn acquire(&self) -> TaskQueuePermit {
|
|
||||||
let acquire = TaskQueuePermitAcquire::new(self.tasks.clone());
|
|
||||||
acquire.await;
|
|
||||||
TaskQueuePermit {
|
|
||||||
tasks: self.tasks.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A permit that when dropped will allow another task to proceed.
|
|
||||||
pub struct TaskQueuePermit {
|
|
||||||
tasks: Arc<Mutex<TaskQueueTasks>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for TaskQueuePermit {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
let next_item = {
|
|
||||||
let mut tasks = self.tasks.lock();
|
|
||||||
let next_item = tasks.wakers.pop_front();
|
|
||||||
tasks.is_running = next_item.is_some();
|
|
||||||
next_item
|
|
||||||
};
|
|
||||||
if let Some(next_item) = next_item {
|
|
||||||
next_item.is_ready.store(true, Ordering::SeqCst);
|
|
||||||
next_item.waker.wake();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct TaskQueuePermitAcquire {
|
|
||||||
tasks: Arc<Mutex<TaskQueueTasks>>,
|
|
||||||
initialized: AtomicBool,
|
|
||||||
waker: Arc<TaskQueueTaskWaker>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TaskQueuePermitAcquire {
|
|
||||||
pub fn new(tasks: Arc<Mutex<TaskQueueTasks>>) -> Self {
|
|
||||||
Self {
|
|
||||||
tasks,
|
|
||||||
initialized: Default::default(),
|
|
||||||
waker: Default::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Future for TaskQueuePermitAcquire {
|
|
||||||
type Output = ();
|
|
||||||
|
|
||||||
fn poll(
|
|
||||||
self: std::pin::Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
) -> std::task::Poll<Self::Output> {
|
|
||||||
// update with the latest waker
|
|
||||||
self.waker.waker.register(cx.waker());
|
|
||||||
|
|
||||||
// ensure this is initialized
|
|
||||||
if !self.initialized.swap(true, Ordering::SeqCst) {
|
|
||||||
let mut tasks = self.tasks.lock();
|
|
||||||
if !tasks.is_running {
|
|
||||||
tasks.is_running = true;
|
|
||||||
return std::task::Poll::Ready(());
|
|
||||||
}
|
|
||||||
tasks.wakers.push_back(self.waker.clone());
|
|
||||||
return std::task::Poll::Pending;
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if we're ready to run
|
|
||||||
if self.waker.is_ready.load(Ordering::SeqCst) {
|
|
||||||
std::task::Poll::Ready(())
|
|
||||||
} else {
|
|
||||||
std::task::Poll::Pending
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use deno_core::futures;
|
|
||||||
use deno_core::parking_lot::Mutex;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use super::TaskQueue;
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn task_queue_runs_one_after_other() {
|
|
||||||
let task_queue = TaskQueue::default();
|
|
||||||
let mut tasks = Vec::new();
|
|
||||||
let data = Arc::new(Mutex::new(0));
|
|
||||||
for i in 0..100 {
|
|
||||||
let data = data.clone();
|
|
||||||
tasks.push(task_queue.queue(async move {
|
|
||||||
tokio::task::spawn_blocking(move || {
|
|
||||||
let mut data = data.lock();
|
|
||||||
if *data != i {
|
|
||||||
panic!("Value was not equal.");
|
|
||||||
}
|
|
||||||
*data = i + 1;
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
futures::future::join_all(tasks).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue