mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
chore: fix flaky stdout_write_all test (#15772)
This commit is contained in:
parent
1cdd2504b1
commit
f6636d4145
2 changed files with 155 additions and 6 deletions
|
@ -29,7 +29,6 @@ use tokio::io::AsyncReadExt;
|
|||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::process;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::io::FromRawFd;
|
||||
|
@ -163,6 +162,8 @@ pub fn init_stdio(stdio: Stdio) -> Extension {
|
|||
#[cfg(unix)]
|
||||
use nix::sys::termios;
|
||||
|
||||
use super::utils::TaskQueue;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct TtyMetadata {
|
||||
#[cfg(unix)]
|
||||
|
@ -445,8 +446,8 @@ pub struct StdFileResource {
|
|||
// asynchronously one at a time in order
|
||||
cell: RefCell<Option<StdFileResourceCellValue>>,
|
||||
// Used to keep async actions in order and only allow one
|
||||
// to occurr at a time
|
||||
cell_async_sempahore: Semaphore,
|
||||
// to occur at a time
|
||||
cell_async_task_queue: TaskQueue,
|
||||
}
|
||||
|
||||
impl StdFileResource {
|
||||
|
@ -456,7 +457,7 @@ impl StdFileResource {
|
|||
inner,
|
||||
meta_data: Default::default(),
|
||||
})),
|
||||
cell_async_sempahore: Semaphore::new(1),
|
||||
cell_async_task_queue: Default::default(),
|
||||
name: name.to_string(),
|
||||
}
|
||||
}
|
||||
|
@ -467,7 +468,7 @@ impl StdFileResource {
|
|||
inner: StdFileResourceInner::file(fs_file),
|
||||
meta_data: Default::default(),
|
||||
})),
|
||||
cell_async_sempahore: Semaphore::new(1),
|
||||
cell_async_task_queue: Default::default(),
|
||||
name: "fsFile".to_string(),
|
||||
}
|
||||
}
|
||||
|
@ -498,7 +499,7 @@ impl StdFileResource {
|
|||
F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static,
|
||||
{
|
||||
// we want to restrict this to one async action at a time
|
||||
let _permit = self.cell_async_sempahore.acquire().await.unwrap();
|
||||
let _permit = self.cell_async_task_queue.acquire().await;
|
||||
// we take the value out of the cell, use it on a blocking task,
|
||||
// then put it back into the cell when we're done
|
||||
let mut did_take = false;
|
||||
|
|
|
@ -1,6 +1,14 @@
|
|||
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
use deno_core::error::custom_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures::task::AtomicWaker;
|
||||
use deno_core::futures::Future;
|
||||
use deno_core::parking_lot::Mutex;
|
||||
use std::collections::LinkedList;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// A utility function to map OsStrings to Strings
|
||||
pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
|
||||
|
@ -9,3 +17,143 @@ pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
|
|||
custom_error("InvalidData", message)
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TaskQueueTaskWaker {
|
||||
is_ready: AtomicBool,
|
||||
waker: AtomicWaker,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TaskQueueTasks {
|
||||
is_running: bool,
|
||||
wakers: LinkedList<Arc<TaskQueueTaskWaker>>,
|
||||
}
|
||||
|
||||
/// A queue that executes tasks sequentially one after the other
|
||||
/// ensuring order and that no task runs at the same time as another.
|
||||
///
|
||||
/// Note that tokio's semaphore doesn't seem to maintain order
|
||||
/// and so we can't use that in the code that uses this or use
|
||||
/// that here.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct TaskQueue {
|
||||
tasks: Arc<Mutex<TaskQueueTasks>>,
|
||||
}
|
||||
|
||||
impl TaskQueue {
|
||||
/// Alternate API that acquires a permit internally
|
||||
/// for the duration of the future.
|
||||
#[cfg(test)]
|
||||
pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R {
|
||||
let _permit = self.acquire().await;
|
||||
future.await
|
||||
}
|
||||
|
||||
/// Acquires a permit where the tasks are executed one at a time
|
||||
/// and in the order that they were acquired.
|
||||
pub async fn acquire(&self) -> TaskQueuePermit {
|
||||
let acquire = TaskQueuePermitAcquire::new(self.tasks.clone());
|
||||
acquire.await;
|
||||
TaskQueuePermit {
|
||||
tasks: self.tasks.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A permit that when dropped will allow another task to proceed.
|
||||
pub struct TaskQueuePermit {
|
||||
tasks: Arc<Mutex<TaskQueueTasks>>,
|
||||
}
|
||||
|
||||
impl Drop for TaskQueuePermit {
|
||||
fn drop(&mut self) {
|
||||
let next_item = {
|
||||
let mut tasks = self.tasks.lock();
|
||||
let next_item = tasks.wakers.pop_front();
|
||||
tasks.is_running = next_item.is_some();
|
||||
next_item
|
||||
};
|
||||
if let Some(next_item) = next_item {
|
||||
next_item.is_ready.store(true, Ordering::SeqCst);
|
||||
next_item.waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct TaskQueuePermitAcquire {
|
||||
tasks: Arc<Mutex<TaskQueueTasks>>,
|
||||
initialized: AtomicBool,
|
||||
waker: Arc<TaskQueueTaskWaker>,
|
||||
}
|
||||
|
||||
impl TaskQueuePermitAcquire {
|
||||
pub fn new(tasks: Arc<Mutex<TaskQueueTasks>>) -> Self {
|
||||
Self {
|
||||
tasks,
|
||||
initialized: Default::default(),
|
||||
waker: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for TaskQueuePermitAcquire {
|
||||
type Output = ();
|
||||
|
||||
fn poll(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Self::Output> {
|
||||
// update with the latest waker
|
||||
self.waker.waker.register(cx.waker());
|
||||
|
||||
// ensure this is initialized
|
||||
if !self.initialized.swap(true, Ordering::SeqCst) {
|
||||
let mut tasks = self.tasks.lock();
|
||||
if !tasks.is_running {
|
||||
tasks.is_running = true;
|
||||
return std::task::Poll::Ready(());
|
||||
}
|
||||
tasks.wakers.push_back(self.waker.clone());
|
||||
return std::task::Poll::Pending;
|
||||
}
|
||||
|
||||
// check if we're ready to run
|
||||
if self.waker.is_ready.load(Ordering::SeqCst) {
|
||||
std::task::Poll::Ready(())
|
||||
} else {
|
||||
std::task::Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use deno_core::futures;
|
||||
use deno_core::parking_lot::Mutex;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::TaskQueue;
|
||||
|
||||
#[tokio::test]
|
||||
async fn task_queue_runs_one_after_other() {
|
||||
let task_queue = TaskQueue::default();
|
||||
let mut tasks = Vec::new();
|
||||
let data = Arc::new(Mutex::new(0));
|
||||
for i in 0..100 {
|
||||
let data = data.clone();
|
||||
tasks.push(task_queue.queue(async move {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut data = data.lock();
|
||||
if *data != i {
|
||||
panic!("Value was not equal.");
|
||||
}
|
||||
*data = i + 1;
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}));
|
||||
}
|
||||
futures::future::join_all(tasks).await;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue