1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-05 13:59:01 -05:00

fix(cli): allow using file resource synchronously while being used async (#15747)

This commit is contained in:
David Sherret 2022-09-04 22:33:06 -04:00 committed by Yoshiya Hinosawa
parent 52cccbf83d
commit 9d127a71b0
No known key found for this signature in database
GPG key ID: 0E8BFAA8A5B4E92B
5 changed files with 347 additions and 64 deletions

View file

@ -18,6 +18,11 @@ itest!(stdin_read_all {
input: Some("01234567890123456789012345678901234567890123456789"),
});
itest!(stdout_write_sync_async {
args: "run --quiet run/stdout_write_sync_async.ts",
output: "run/stdout_write_sync_async.out",
});
itest!(_001_hello {
args: "run --reload 001_hello.js",
output: "001_hello.js.out",

View file

@ -0,0 +1,200 @@
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello

View file

@ -0,0 +1,14 @@
const encoder = new TextEncoder();
const pending = [];
for (let i = 0; i < 100; i++) {
// some code that will cause stdout to be written
// synchronously while the async write might be occurring
console.log("Hello");
pending.push(Deno.stdout.write(encoder.encode("Hello\n")));
if (i % 10) {
await new Promise((resolve) => setTimeout(resolve, 0));
}
}
await Promise.all(pending);

View file

@ -3,6 +3,7 @@
use deno_core::error::resource_unavailable;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::parking_lot::Mutex;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
@ -22,11 +23,13 @@ use std::io::ErrorKind;
use std::io::Read;
use std::io::Write;
use std::rc::Rc;
use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::process;
use tokio::sync::Semaphore;
#[cfg(unix)]
use std::os::unix::io::FromRawFd;
@ -124,27 +127,30 @@ pub fn init_stdio(stdio: Stdio) -> Extension {
let t = &mut state.resource_table;
t.add(StdFileResource::stdio(
match stdio.stdin {
StdioPipe::Inherit => {
StdFileResourceInner::Stdin(STDIN_HANDLE.try_clone().unwrap())
}
StdioPipe::Inherit => StdFileResourceInner {
kind: StdFileResourceKind::Stdin,
file: STDIN_HANDLE.try_clone().unwrap(),
},
StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
},
"stdin",
));
t.add(StdFileResource::stdio(
match stdio.stdout {
StdioPipe::Inherit => {
StdFileResourceInner::Stdout(STDOUT_HANDLE.try_clone().unwrap())
}
StdioPipe::Inherit => StdFileResourceInner {
kind: StdFileResourceKind::Stdout,
file: STDOUT_HANDLE.try_clone().unwrap(),
},
StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
},
"stdout",
));
t.add(StdFileResource::stdio(
match stdio.stderr {
StdioPipe::Inherit => {
StdFileResourceInner::Stderr(STDERR_HANDLE.try_clone().unwrap())
}
StdioPipe::Inherit => StdFileResourceInner {
kind: StdFileResourceKind::Stderr,
file: STDERR_HANDLE.try_clone().unwrap(),
},
StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
},
"stderr",
@ -308,29 +314,40 @@ impl Resource for ChildStderrResource {
}
}
enum StdFileResourceInner {
File(StdFile),
Stdin(StdFile),
#[derive(Clone, Copy)]
enum StdFileResourceKind {
File,
// For stdout and stderr, we sometimes instead use std::io::stdout() directly,
// because we get some Windows specific functionality for free by using Rust
// std's wrappers. So we take a bit of a complexity hit in order to not
// have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs
Stdout(StdFile),
Stderr(StdFile),
Stdin,
Stdout,
Stderr,
}
struct StdFileResourceInner {
kind: StdFileResourceKind,
file: StdFile,
}
impl StdFileResourceInner {
pub fn file(fs_file: StdFile) -> Self {
StdFileResourceInner::File(fs_file)
StdFileResourceInner {
kind: StdFileResourceKind::File,
file: fs_file,
}
}
pub fn with_file<R>(&mut self, f: impl FnOnce(&mut StdFile) -> R) -> R {
match self {
Self::File(file)
| Self::Stdin(file)
| Self::Stdout(file)
| Self::Stderr(file) => f(file),
}
f(&mut self.file)
}
pub fn try_clone(&self) -> Result<Self, std::io::Error> {
Ok(Self {
kind: self.kind,
file: self.file.try_clone()?,
})
}
pub fn write_and_maybe_flush(
@ -343,19 +360,19 @@ impl StdFileResourceInner {
// using the raw fds/handles, it will cause encoding issues on Windows
// that we get solved for free by using Rust's stdio wrappers (see
// std/src/sys/windows/stdio.rs in Rust's source code).
match self {
Self::File(file) => Ok(file.write(buf)?),
Self::Stdin(_) => {
match self.kind {
StdFileResourceKind::File => Ok(self.file.write(buf)?),
StdFileResourceKind::Stdin => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
Self::Stdout(_) => {
StdFileResourceKind::Stdout => {
// bypass the file and use std::io::stdout()
let mut stdout = std::io::stdout().lock();
let nwritten = stdout.write(buf)?;
stdout.flush()?;
Ok(nwritten)
}
Self::Stderr(_) => {
StdFileResourceKind::Stderr => {
// bypass the file and use std::io::stderr()
let mut stderr = std::io::stderr().lock();
let nwritten = stderr.write(buf)?;
@ -371,19 +388,19 @@ impl StdFileResourceInner {
) -> Result<(), AnyError> {
// this method exists instead of using a `Write` implementation
// so that we can acquire the locks once and do both actions
match self {
Self::File(file) => Ok(file.write_all(buf)?),
Self::Stdin(_) => {
match self.kind {
StdFileResourceKind::File => Ok(self.file.write_all(buf)?),
StdFileResourceKind::Stdin => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
Self::Stdout(_) => {
StdFileResourceKind::Stdout => {
// bypass the file and use std::io::stdout()
let mut stdout = std::io::stdout().lock();
stdout.write_all(buf)?;
stdout.flush()?;
Ok(())
}
Self::Stderr(_) => {
StdFileResourceKind::Stderr => {
// bypass the file and use std::io::stderr()
let mut stderr = std::io::stderr().lock();
stderr.write_all(buf)?;
@ -396,32 +413,61 @@ impl StdFileResourceInner {
impl Read for StdFileResourceInner {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self {
Self::File(file) | Self::Stdin(file) => file.read(buf),
Self::Stdout(_) | Self::Stderr(_) => Err(ErrorKind::Unsupported.into()),
match self.kind {
StdFileResourceKind::File | StdFileResourceKind::Stdin => {
self.file.read(buf)
}
StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => {
Err(ErrorKind::Unsupported.into())
}
}
}
}
struct StdFileResourceCellValue {
inner: StdFileResourceInner,
meta_data: Arc<Mutex<FileMetadata>>,
}
impl StdFileResourceCellValue {
pub fn try_clone(&self) -> Result<Self, std::io::Error> {
Ok(Self {
inner: self.inner.try_clone()?,
meta_data: self.meta_data.clone(),
})
}
}
pub struct StdFileResource {
name: String,
cell: AsyncRefCell<Option<(StdFileResourceInner, FileMetadata)>>,
// We can't use an AsyncRefCell here because we need to allow
// access to the resource synchronously at any time and
// asynchronously one at a time in order
cell: RefCell<Option<StdFileResourceCellValue>>,
// Used to keep async actions in order and only allow one
// to occurr at a time
cell_async_sempahore: Semaphore,
}
impl StdFileResource {
fn stdio(inner: StdFileResourceInner, name: &str) -> Self {
Self {
cell: AsyncRefCell::new(Some((inner, Default::default()))),
cell: RefCell::new(Some(StdFileResourceCellValue {
inner,
meta_data: Default::default(),
})),
cell_async_sempahore: Semaphore::new(1),
name: name.to_string(),
}
}
pub fn fs_file(fs_file: StdFile) -> Self {
Self {
cell: AsyncRefCell::new(Some((
StdFileResourceInner::file(fs_file),
Default::default(),
))),
cell: RefCell::new(Some(StdFileResourceCellValue {
inner: StdFileResourceInner::file(fs_file),
meta_data: Default::default(),
})),
cell_async_sempahore: Semaphore::new(1),
name: "fsFile".to_string(),
}
}
@ -430,17 +476,17 @@ impl StdFileResource {
self: Rc<Self>,
action: impl FnOnce(
&mut StdFileResourceInner,
&mut FileMetadata,
&Arc<Mutex<FileMetadata>>,
) -> Result<TResult, AnyError>,
) -> Result<TResult, AnyError> {
match RcRef::map(&self, |r| &r.cell).try_borrow_mut() {
Some(mut cell) => {
match self.cell.try_borrow_mut() {
Ok(mut cell) => {
let mut file = cell.take().unwrap();
let result = action(&mut file.0, &mut file.1);
let result = action(&mut file.inner, &file.meta_data);
cell.replace(file);
result
}
None => Err(resource_unavailable()),
Err(_) => Err(resource_unavailable()),
}
}
@ -451,17 +497,33 @@ impl StdFileResource {
where
F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static,
{
// we want to restrict this to one async action at a time
let _permit = self.cell_async_sempahore.acquire().await.unwrap();
// we take the value out of the cell, use it on a blocking task,
// then put it back into the cell when we're done
let mut cell = RcRef::map(&self, |r| &r.cell).borrow_mut().await;
let mut file = cell.take().unwrap();
let (file, result) = tokio::task::spawn_blocking(move || {
let result = action(&mut file.0);
(file, result)
let mut did_take = false;
let mut cell_value = {
let mut cell = self.cell.borrow_mut();
match cell.as_mut().unwrap().try_clone() {
Ok(value) => value,
Err(_) => {
did_take = true;
cell.take().unwrap()
}
}
};
let (cell_value, result) = tokio::task::spawn_blocking(move || {
let result = action(&mut cell_value.inner);
(cell_value, result)
})
.await
.unwrap();
cell.replace(file);
if did_take {
// put it back
self.cell.borrow_mut().replace(cell_value);
}
result
}
@ -515,7 +577,7 @@ impl StdFileResource {
f: F,
) -> Result<R, AnyError>
where
F: FnOnce(&mut StdFile, &mut FileMetadata) -> Result<R, AnyError>,
F: FnOnce(&mut StdFile, &Arc<Mutex<FileMetadata>>) -> Result<R, AnyError>,
{
Self::with_resource(state, rid, move |resource| {
resource.with_inner_and_metadata(move |inner, metadata| {
@ -556,9 +618,9 @@ impl StdFileResource {
rid: u32,
) -> Result<std::process::Stdio, AnyError> {
Self::with_resource(state, rid, |resource| {
resource.with_inner_and_metadata(|inner, _| match inner {
StdFileResourceInner::File(file) => {
let file = file.try_clone()?;
resource.with_inner_and_metadata(|inner, _| match inner.kind {
StdFileResourceKind::File => {
let file = inner.file.try_clone()?;
Ok(file.into())
}
_ => Ok(std::process::Stdio::inherit()),

View file

@ -123,16 +123,18 @@ fn op_set_raw(state: &mut OpState, args: SetRawArgs) -> Result<(), AnyError> {
rid,
move |std_file, meta_data| {
let raw_fd = std_file.as_raw_fd();
let maybe_tty_mode = &mut meta_data.tty.mode;
if is_raw {
if maybe_tty_mode.is_none() {
// Save original mode.
let original_mode = termios::tcgetattr(raw_fd)?;
maybe_tty_mode.replace(original_mode);
}
let mut raw = maybe_tty_mode.clone().unwrap();
let mut raw = {
let mut meta_data = meta_data.lock();
let maybe_tty_mode = &mut meta_data.tty.mode;
if maybe_tty_mode.is_none() {
// Save original mode.
let original_mode = termios::tcgetattr(raw_fd)?;
maybe_tty_mode.replace(original_mode);
}
maybe_tty_mode.clone().unwrap()
};
raw.input_flags &= !(termios::InputFlags::BRKINT
| termios::InputFlags::ICRNL
@ -155,7 +157,7 @@ fn op_set_raw(state: &mut OpState, args: SetRawArgs) -> Result<(), AnyError> {
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?;
} else {
// Try restore saved mode.
if let Some(mode) = maybe_tty_mode.take() {
if let Some(mode) = meta_data.lock().tty.mode.take() {
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?;
}
}