1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-01 16:51:13 -05:00

chore: fix flaky stdout_write_all test (#15772)

This commit is contained in:
David Sherret 2022-09-05 09:05:48 -04:00 committed by Yoshiya Hinosawa
parent 5f51c8fcbf
commit 916c2e96ac
No known key found for this signature in database
GPG key ID: 0E8BFAA8A5B4E92B
2 changed files with 155 additions and 6 deletions

View file

@ -29,7 +29,6 @@ use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite; use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::process; use tokio::process;
use tokio::sync::Semaphore;
#[cfg(unix)] #[cfg(unix)]
use std::os::unix::io::FromRawFd; use std::os::unix::io::FromRawFd;
@ -163,6 +162,8 @@ pub fn init_stdio(stdio: Stdio) -> Extension {
#[cfg(unix)] #[cfg(unix)]
use nix::sys::termios; use nix::sys::termios;
use super::utils::TaskQueue;
#[derive(Default)] #[derive(Default)]
pub struct TtyMetadata { pub struct TtyMetadata {
#[cfg(unix)] #[cfg(unix)]
@ -445,8 +446,8 @@ pub struct StdFileResource {
// asynchronously one at a time in order // asynchronously one at a time in order
cell: RefCell<Option<StdFileResourceCellValue>>, cell: RefCell<Option<StdFileResourceCellValue>>,
// Used to keep async actions in order and only allow one // Used to keep async actions in order and only allow one
// to occurr at a time // to occur at a time
cell_async_sempahore: Semaphore, cell_async_task_queue: TaskQueue,
} }
impl StdFileResource { impl StdFileResource {
@ -456,7 +457,7 @@ impl StdFileResource {
inner, inner,
meta_data: Default::default(), meta_data: Default::default(),
})), })),
cell_async_sempahore: Semaphore::new(1), cell_async_task_queue: Default::default(),
name: name.to_string(), name: name.to_string(),
} }
} }
@ -467,7 +468,7 @@ impl StdFileResource {
inner: StdFileResourceInner::file(fs_file), inner: StdFileResourceInner::file(fs_file),
meta_data: Default::default(), meta_data: Default::default(),
})), })),
cell_async_sempahore: Semaphore::new(1), cell_async_task_queue: Default::default(),
name: "fsFile".to_string(), name: "fsFile".to_string(),
} }
} }
@ -498,7 +499,7 @@ impl StdFileResource {
F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static, F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static,
{ {
// we want to restrict this to one async action at a time // we want to restrict this to one async action at a time
let _permit = self.cell_async_sempahore.acquire().await.unwrap(); let _permit = self.cell_async_task_queue.acquire().await;
// we take the value out of the cell, use it on a blocking task, // we take the value out of the cell, use it on a blocking task,
// then put it back into the cell when we're done // then put it back into the cell when we're done
let mut did_take = false; let mut did_take = false;

View file

@ -1,6 +1,14 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use deno_core::error::custom_error; use deno_core::error::custom_error;
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::futures::task::AtomicWaker;
use deno_core::futures::Future;
use deno_core::parking_lot::Mutex;
use std::collections::LinkedList;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
/// A utility function to map OsStrings to Strings /// A utility function to map OsStrings to Strings
pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> { pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
@ -9,3 +17,143 @@ pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
custom_error("InvalidData", message) custom_error("InvalidData", message)
}) })
} }
#[derive(Default)]
struct TaskQueueTaskWaker {
is_ready: AtomicBool,
waker: AtomicWaker,
}
#[derive(Default)]
struct TaskQueueTasks {
is_running: bool,
wakers: LinkedList<Arc<TaskQueueTaskWaker>>,
}
/// A queue that executes tasks sequentially one after the other
/// ensuring order and that no task runs at the same time as another.
///
/// Note that tokio's semaphore doesn't seem to maintain order
/// and so we can't use that in the code that uses this or use
/// that here.
#[derive(Clone, Default)]
pub struct TaskQueue {
tasks: Arc<Mutex<TaskQueueTasks>>,
}
impl TaskQueue {
/// Alternate API that acquires a permit internally
/// for the duration of the future.
#[cfg(test)]
pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R {
let _permit = self.acquire().await;
future.await
}
/// Acquires a permit where the tasks are executed one at a time
/// and in the order that they were acquired.
pub async fn acquire(&self) -> TaskQueuePermit {
let acquire = TaskQueuePermitAcquire::new(self.tasks.clone());
acquire.await;
TaskQueuePermit {
tasks: self.tasks.clone(),
}
}
}
/// A permit that when dropped will allow another task to proceed.
pub struct TaskQueuePermit {
tasks: Arc<Mutex<TaskQueueTasks>>,
}
impl Drop for TaskQueuePermit {
fn drop(&mut self) {
let next_item = {
let mut tasks = self.tasks.lock();
let next_item = tasks.wakers.pop_front();
tasks.is_running = next_item.is_some();
next_item
};
if let Some(next_item) = next_item {
next_item.is_ready.store(true, Ordering::SeqCst);
next_item.waker.wake();
}
}
}
struct TaskQueuePermitAcquire {
tasks: Arc<Mutex<TaskQueueTasks>>,
initialized: AtomicBool,
waker: Arc<TaskQueueTaskWaker>,
}
impl TaskQueuePermitAcquire {
pub fn new(tasks: Arc<Mutex<TaskQueueTasks>>) -> Self {
Self {
tasks,
initialized: Default::default(),
waker: Default::default(),
}
}
}
impl Future for TaskQueuePermitAcquire {
type Output = ();
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
// update with the latest waker
self.waker.waker.register(cx.waker());
// ensure this is initialized
if !self.initialized.swap(true, Ordering::SeqCst) {
let mut tasks = self.tasks.lock();
if !tasks.is_running {
tasks.is_running = true;
return std::task::Poll::Ready(());
}
tasks.wakers.push_back(self.waker.clone());
return std::task::Poll::Pending;
}
// check if we're ready to run
if self.waker.is_ready.load(Ordering::SeqCst) {
std::task::Poll::Ready(())
} else {
std::task::Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use deno_core::futures;
use deno_core::parking_lot::Mutex;
use std::sync::Arc;
use super::TaskQueue;
#[tokio::test]
async fn task_queue_runs_one_after_other() {
let task_queue = TaskQueue::default();
let mut tasks = Vec::new();
let data = Arc::new(Mutex::new(0));
for i in 0..100 {
let data = data.clone();
tasks.push(task_queue.queue(async move {
tokio::task::spawn_blocking(move || {
let mut data = data.lock();
if *data != i {
panic!("Value was not equal.");
}
*data = i + 1;
})
.await
.unwrap();
}));
}
futures::future::join_all(tasks).await;
}
}