mirror of
https://github.com/denoland/deno.git
synced 2024-12-24 16:19:12 -05:00
fix(cli): allow using file resource synchronously while being used async (#15747)
This commit is contained in:
parent
b0a671df8e
commit
08a6af398f
5 changed files with 347 additions and 64 deletions
|
@ -18,6 +18,11 @@ itest!(stdin_read_all {
|
|||
input: Some("01234567890123456789012345678901234567890123456789"),
|
||||
});
|
||||
|
||||
itest!(stdout_write_sync_async {
|
||||
args: "run --quiet run/stdout_write_sync_async.ts",
|
||||
output: "run/stdout_write_sync_async.out",
|
||||
});
|
||||
|
||||
itest!(_001_hello {
|
||||
args: "run --reload 001_hello.js",
|
||||
output: "001_hello.js.out",
|
||||
|
|
200
cli/tests/testdata/run/stdout_write_sync_async.out
vendored
Normal file
200
cli/tests/testdata/run/stdout_write_sync_async.out
vendored
Normal file
|
@ -0,0 +1,200 @@
|
|||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
||||
Hello
|
14
cli/tests/testdata/run/stdout_write_sync_async.ts
vendored
Normal file
14
cli/tests/testdata/run/stdout_write_sync_async.ts
vendored
Normal file
|
@ -0,0 +1,14 @@
|
|||
const encoder = new TextEncoder();
|
||||
const pending = [];
|
||||
|
||||
for (let i = 0; i < 100; i++) {
|
||||
// some code that will cause stdout to be written
|
||||
// synchronously while the async write might be occurring
|
||||
console.log("Hello");
|
||||
pending.push(Deno.stdout.write(encoder.encode("Hello\n")));
|
||||
if (i % 10) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.all(pending);
|
|
@ -3,6 +3,7 @@
|
|||
use deno_core::error::resource_unavailable;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::op;
|
||||
use deno_core::parking_lot::Mutex;
|
||||
use deno_core::AsyncMutFuture;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::AsyncResult;
|
||||
|
@ -22,11 +23,13 @@ use std::io::ErrorKind;
|
|||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::process;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::io::FromRawFd;
|
||||
|
@ -124,27 +127,30 @@ pub fn init_stdio(stdio: Stdio) -> Extension {
|
|||
let t = &mut state.resource_table;
|
||||
t.add(StdFileResource::stdio(
|
||||
match stdio.stdin {
|
||||
StdioPipe::Inherit => {
|
||||
StdFileResourceInner::Stdin(STDIN_HANDLE.try_clone().unwrap())
|
||||
}
|
||||
StdioPipe::Inherit => StdFileResourceInner {
|
||||
kind: StdFileResourceKind::Stdin,
|
||||
file: STDIN_HANDLE.try_clone().unwrap(),
|
||||
},
|
||||
StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
|
||||
},
|
||||
"stdin",
|
||||
));
|
||||
t.add(StdFileResource::stdio(
|
||||
match stdio.stdout {
|
||||
StdioPipe::Inherit => {
|
||||
StdFileResourceInner::Stdout(STDOUT_HANDLE.try_clone().unwrap())
|
||||
}
|
||||
StdioPipe::Inherit => StdFileResourceInner {
|
||||
kind: StdFileResourceKind::Stdout,
|
||||
file: STDOUT_HANDLE.try_clone().unwrap(),
|
||||
},
|
||||
StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
|
||||
},
|
||||
"stdout",
|
||||
));
|
||||
t.add(StdFileResource::stdio(
|
||||
match stdio.stderr {
|
||||
StdioPipe::Inherit => {
|
||||
StdFileResourceInner::Stderr(STDERR_HANDLE.try_clone().unwrap())
|
||||
}
|
||||
StdioPipe::Inherit => StdFileResourceInner {
|
||||
kind: StdFileResourceKind::Stderr,
|
||||
file: STDERR_HANDLE.try_clone().unwrap(),
|
||||
},
|
||||
StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
|
||||
},
|
||||
"stderr",
|
||||
|
@ -308,29 +314,40 @@ impl Resource for ChildStderrResource {
|
|||
}
|
||||
}
|
||||
|
||||
enum StdFileResourceInner {
|
||||
File(StdFile),
|
||||
Stdin(StdFile),
|
||||
#[derive(Clone, Copy)]
|
||||
enum StdFileResourceKind {
|
||||
File,
|
||||
// For stdout and stderr, we sometimes instead use std::io::stdout() directly,
|
||||
// because we get some Windows specific functionality for free by using Rust
|
||||
// std's wrappers. So we take a bit of a complexity hit in order to not
|
||||
// have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs
|
||||
Stdout(StdFile),
|
||||
Stderr(StdFile),
|
||||
Stdin,
|
||||
Stdout,
|
||||
Stderr,
|
||||
}
|
||||
|
||||
struct StdFileResourceInner {
|
||||
kind: StdFileResourceKind,
|
||||
file: StdFile,
|
||||
}
|
||||
|
||||
impl StdFileResourceInner {
|
||||
pub fn file(fs_file: StdFile) -> Self {
|
||||
StdFileResourceInner::File(fs_file)
|
||||
StdFileResourceInner {
|
||||
kind: StdFileResourceKind::File,
|
||||
file: fs_file,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_file<R>(&mut self, f: impl FnOnce(&mut StdFile) -> R) -> R {
|
||||
match self {
|
||||
Self::File(file)
|
||||
| Self::Stdin(file)
|
||||
| Self::Stdout(file)
|
||||
| Self::Stderr(file) => f(file),
|
||||
}
|
||||
f(&mut self.file)
|
||||
}
|
||||
|
||||
pub fn try_clone(&self) -> Result<Self, std::io::Error> {
|
||||
Ok(Self {
|
||||
kind: self.kind,
|
||||
file: self.file.try_clone()?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn write_and_maybe_flush(
|
||||
|
@ -343,19 +360,19 @@ impl StdFileResourceInner {
|
|||
// using the raw fds/handles, it will cause encoding issues on Windows
|
||||
// that we get solved for free by using Rust's stdio wrappers (see
|
||||
// std/src/sys/windows/stdio.rs in Rust's source code).
|
||||
match self {
|
||||
Self::File(file) => Ok(file.write(buf)?),
|
||||
Self::Stdin(_) => {
|
||||
match self.kind {
|
||||
StdFileResourceKind::File => Ok(self.file.write(buf)?),
|
||||
StdFileResourceKind::Stdin => {
|
||||
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
|
||||
}
|
||||
Self::Stdout(_) => {
|
||||
StdFileResourceKind::Stdout => {
|
||||
// bypass the file and use std::io::stdout()
|
||||
let mut stdout = std::io::stdout().lock();
|
||||
let nwritten = stdout.write(buf)?;
|
||||
stdout.flush()?;
|
||||
Ok(nwritten)
|
||||
}
|
||||
Self::Stderr(_) => {
|
||||
StdFileResourceKind::Stderr => {
|
||||
// bypass the file and use std::io::stderr()
|
||||
let mut stderr = std::io::stderr().lock();
|
||||
let nwritten = stderr.write(buf)?;
|
||||
|
@ -371,19 +388,19 @@ impl StdFileResourceInner {
|
|||
) -> Result<(), AnyError> {
|
||||
// this method exists instead of using a `Write` implementation
|
||||
// so that we can acquire the locks once and do both actions
|
||||
match self {
|
||||
Self::File(file) => Ok(file.write_all(buf)?),
|
||||
Self::Stdin(_) => {
|
||||
match self.kind {
|
||||
StdFileResourceKind::File => Ok(self.file.write_all(buf)?),
|
||||
StdFileResourceKind::Stdin => {
|
||||
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
|
||||
}
|
||||
Self::Stdout(_) => {
|
||||
StdFileResourceKind::Stdout => {
|
||||
// bypass the file and use std::io::stdout()
|
||||
let mut stdout = std::io::stdout().lock();
|
||||
stdout.write_all(buf)?;
|
||||
stdout.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
Self::Stderr(_) => {
|
||||
StdFileResourceKind::Stderr => {
|
||||
// bypass the file and use std::io::stderr()
|
||||
let mut stderr = std::io::stderr().lock();
|
||||
stderr.write_all(buf)?;
|
||||
|
@ -396,32 +413,61 @@ impl StdFileResourceInner {
|
|||
|
||||
impl Read for StdFileResourceInner {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
match self {
|
||||
Self::File(file) | Self::Stdin(file) => file.read(buf),
|
||||
Self::Stdout(_) | Self::Stderr(_) => Err(ErrorKind::Unsupported.into()),
|
||||
match self.kind {
|
||||
StdFileResourceKind::File | StdFileResourceKind::Stdin => {
|
||||
self.file.read(buf)
|
||||
}
|
||||
StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => {
|
||||
Err(ErrorKind::Unsupported.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct StdFileResourceCellValue {
|
||||
inner: StdFileResourceInner,
|
||||
meta_data: Arc<Mutex<FileMetadata>>,
|
||||
}
|
||||
|
||||
impl StdFileResourceCellValue {
|
||||
pub fn try_clone(&self) -> Result<Self, std::io::Error> {
|
||||
Ok(Self {
|
||||
inner: self.inner.try_clone()?,
|
||||
meta_data: self.meta_data.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StdFileResource {
|
||||
name: String,
|
||||
cell: AsyncRefCell<Option<(StdFileResourceInner, FileMetadata)>>,
|
||||
// We can't use an AsyncRefCell here because we need to allow
|
||||
// access to the resource synchronously at any time and
|
||||
// asynchronously one at a time in order
|
||||
cell: RefCell<Option<StdFileResourceCellValue>>,
|
||||
// Used to keep async actions in order and only allow one
|
||||
// to occurr at a time
|
||||
cell_async_sempahore: Semaphore,
|
||||
}
|
||||
|
||||
impl StdFileResource {
|
||||
fn stdio(inner: StdFileResourceInner, name: &str) -> Self {
|
||||
Self {
|
||||
cell: AsyncRefCell::new(Some((inner, Default::default()))),
|
||||
cell: RefCell::new(Some(StdFileResourceCellValue {
|
||||
inner,
|
||||
meta_data: Default::default(),
|
||||
})),
|
||||
cell_async_sempahore: Semaphore::new(1),
|
||||
name: name.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fs_file(fs_file: StdFile) -> Self {
|
||||
Self {
|
||||
cell: AsyncRefCell::new(Some((
|
||||
StdFileResourceInner::file(fs_file),
|
||||
Default::default(),
|
||||
))),
|
||||
cell: RefCell::new(Some(StdFileResourceCellValue {
|
||||
inner: StdFileResourceInner::file(fs_file),
|
||||
meta_data: Default::default(),
|
||||
})),
|
||||
cell_async_sempahore: Semaphore::new(1),
|
||||
name: "fsFile".to_string(),
|
||||
}
|
||||
}
|
||||
|
@ -430,17 +476,17 @@ impl StdFileResource {
|
|||
self: Rc<Self>,
|
||||
action: impl FnOnce(
|
||||
&mut StdFileResourceInner,
|
||||
&mut FileMetadata,
|
||||
&Arc<Mutex<FileMetadata>>,
|
||||
) -> Result<TResult, AnyError>,
|
||||
) -> Result<TResult, AnyError> {
|
||||
match RcRef::map(&self, |r| &r.cell).try_borrow_mut() {
|
||||
Some(mut cell) => {
|
||||
match self.cell.try_borrow_mut() {
|
||||
Ok(mut cell) => {
|
||||
let mut file = cell.take().unwrap();
|
||||
let result = action(&mut file.0, &mut file.1);
|
||||
let result = action(&mut file.inner, &file.meta_data);
|
||||
cell.replace(file);
|
||||
result
|
||||
}
|
||||
None => Err(resource_unavailable()),
|
||||
Err(_) => Err(resource_unavailable()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -451,17 +497,33 @@ impl StdFileResource {
|
|||
where
|
||||
F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static,
|
||||
{
|
||||
// we want to restrict this to one async action at a time
|
||||
let _permit = self.cell_async_sempahore.acquire().await.unwrap();
|
||||
// we take the value out of the cell, use it on a blocking task,
|
||||
// then put it back into the cell when we're done
|
||||
let mut cell = RcRef::map(&self, |r| &r.cell).borrow_mut().await;
|
||||
let mut file = cell.take().unwrap();
|
||||
let (file, result) = tokio::task::spawn_blocking(move || {
|
||||
let result = action(&mut file.0);
|
||||
(file, result)
|
||||
let mut did_take = false;
|
||||
let mut cell_value = {
|
||||
let mut cell = self.cell.borrow_mut();
|
||||
match cell.as_mut().unwrap().try_clone() {
|
||||
Ok(value) => value,
|
||||
Err(_) => {
|
||||
did_take = true;
|
||||
cell.take().unwrap()
|
||||
}
|
||||
}
|
||||
};
|
||||
let (cell_value, result) = tokio::task::spawn_blocking(move || {
|
||||
let result = action(&mut cell_value.inner);
|
||||
(cell_value, result)
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
cell.replace(file);
|
||||
|
||||
if did_take {
|
||||
// put it back
|
||||
self.cell.borrow_mut().replace(cell_value);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
|
@ -515,7 +577,7 @@ impl StdFileResource {
|
|||
f: F,
|
||||
) -> Result<R, AnyError>
|
||||
where
|
||||
F: FnOnce(&mut StdFile, &mut FileMetadata) -> Result<R, AnyError>,
|
||||
F: FnOnce(&mut StdFile, &Arc<Mutex<FileMetadata>>) -> Result<R, AnyError>,
|
||||
{
|
||||
Self::with_resource(state, rid, move |resource| {
|
||||
resource.with_inner_and_metadata(move |inner, metadata| {
|
||||
|
@ -556,9 +618,9 @@ impl StdFileResource {
|
|||
rid: u32,
|
||||
) -> Result<std::process::Stdio, AnyError> {
|
||||
Self::with_resource(state, rid, |resource| {
|
||||
resource.with_inner_and_metadata(|inner, _| match inner {
|
||||
StdFileResourceInner::File(file) => {
|
||||
let file = file.try_clone()?;
|
||||
resource.with_inner_and_metadata(|inner, _| match inner.kind {
|
||||
StdFileResourceKind::File => {
|
||||
let file = inner.file.try_clone()?;
|
||||
Ok(file.into())
|
||||
}
|
||||
_ => Ok(std::process::Stdio::inherit()),
|
||||
|
|
|
@ -123,16 +123,18 @@ fn op_set_raw(state: &mut OpState, args: SetRawArgs) -> Result<(), AnyError> {
|
|||
rid,
|
||||
move |std_file, meta_data| {
|
||||
let raw_fd = std_file.as_raw_fd();
|
||||
let maybe_tty_mode = &mut meta_data.tty.mode;
|
||||
|
||||
if is_raw {
|
||||
if maybe_tty_mode.is_none() {
|
||||
// Save original mode.
|
||||
let original_mode = termios::tcgetattr(raw_fd)?;
|
||||
maybe_tty_mode.replace(original_mode);
|
||||
}
|
||||
|
||||
let mut raw = maybe_tty_mode.clone().unwrap();
|
||||
let mut raw = {
|
||||
let mut meta_data = meta_data.lock();
|
||||
let maybe_tty_mode = &mut meta_data.tty.mode;
|
||||
if maybe_tty_mode.is_none() {
|
||||
// Save original mode.
|
||||
let original_mode = termios::tcgetattr(raw_fd)?;
|
||||
maybe_tty_mode.replace(original_mode);
|
||||
}
|
||||
maybe_tty_mode.clone().unwrap()
|
||||
};
|
||||
|
||||
raw.input_flags &= !(termios::InputFlags::BRKINT
|
||||
| termios::InputFlags::ICRNL
|
||||
|
@ -155,7 +157,7 @@ fn op_set_raw(state: &mut OpState, args: SetRawArgs) -> Result<(), AnyError> {
|
|||
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?;
|
||||
} else {
|
||||
// Try restore saved mode.
|
||||
if let Some(mode) = maybe_tty_mode.take() {
|
||||
if let Some(mode) = meta_data.lock().tty.mode.take() {
|
||||
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue