diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs index 20661f27fd..118623760c 100644 --- a/cli/tests/integration/run_tests.rs +++ b/cli/tests/integration/run_tests.rs @@ -18,6 +18,11 @@ itest!(stdin_read_all { input: Some("01234567890123456789012345678901234567890123456789"), }); +itest!(stdout_write_sync_async { + args: "run --quiet run/stdout_write_sync_async.ts", + output: "run/stdout_write_sync_async.out", +}); + itest!(_001_hello { args: "run --reload 001_hello.js", output: "001_hello.js.out", diff --git a/cli/tests/testdata/run/stdout_write_sync_async.out b/cli/tests/testdata/run/stdout_write_sync_async.out new file mode 100644 index 0000000000..91ebda1cae --- /dev/null +++ b/cli/tests/testdata/run/stdout_write_sync_async.out @@ -0,0 +1,200 @@ +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello +Hello diff --git a/cli/tests/testdata/run/stdout_write_sync_async.ts b/cli/tests/testdata/run/stdout_write_sync_async.ts new file mode 100644 index 0000000000..648999d8a2 --- /dev/null +++ b/cli/tests/testdata/run/stdout_write_sync_async.ts @@ -0,0 +1,14 @@ +const encoder = new TextEncoder(); +const pending = []; + +for (let i = 0; i < 100; i++) { + // some code that will cause stdout to be written + // synchronously while the async write might be occurring + console.log("Hello"); + pending.push(Deno.stdout.write(encoder.encode("Hello\n"))); + if (i % 10) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } +} + +await Promise.all(pending); diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 900be10346..0eeab4c244 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -3,6 +3,7 @@ use deno_core::error::resource_unavailable; use deno_core::error::AnyError; use deno_core::op; +use deno_core::parking_lot::Mutex; use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; use deno_core::AsyncResult; @@ -22,11 +23,13 @@ use std::io::ErrorKind; use std::io::Read; use std::io::Write; use std::rc::Rc; +use std::sync::Arc; use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::process; +use tokio::sync::Semaphore; #[cfg(unix)] use std::os::unix::io::FromRawFd; @@ -124,27 +127,30 @@ pub fn init_stdio(stdio: Stdio) -> Extension { let t = &mut state.resource_table; t.add(StdFileResource::stdio( match stdio.stdin { - StdioPipe::Inherit => { - StdFileResourceInner::Stdin(STDIN_HANDLE.try_clone().unwrap()) - } + StdioPipe::Inherit => StdFileResourceInner { + kind: StdFileResourceKind::Stdin, + file: STDIN_HANDLE.try_clone().unwrap(), + }, StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), }, "stdin", )); t.add(StdFileResource::stdio( match stdio.stdout { - StdioPipe::Inherit => { - StdFileResourceInner::Stdout(STDOUT_HANDLE.try_clone().unwrap()) - } + StdioPipe::Inherit => StdFileResourceInner { + kind: StdFileResourceKind::Stdout, + file: STDOUT_HANDLE.try_clone().unwrap(), + }, StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), }, "stdout", )); t.add(StdFileResource::stdio( match stdio.stderr { - StdioPipe::Inherit => { - StdFileResourceInner::Stderr(STDERR_HANDLE.try_clone().unwrap()) - } + StdioPipe::Inherit => StdFileResourceInner { + kind: StdFileResourceKind::Stderr, + file: STDERR_HANDLE.try_clone().unwrap(), + }, StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), }, "stderr", @@ -308,29 +314,40 @@ impl Resource for ChildStderrResource { } } -enum StdFileResourceInner { - File(StdFile), - Stdin(StdFile), +#[derive(Clone, Copy)] +enum StdFileResourceKind { + File, // For stdout and stderr, we sometimes instead use std::io::stdout() directly, // because we get some Windows specific functionality for free by using Rust // std's wrappers. So we take a bit of a complexity hit in order to not // have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs - Stdout(StdFile), - Stderr(StdFile), + Stdin, + Stdout, + Stderr, +} + +struct StdFileResourceInner { + kind: StdFileResourceKind, + file: StdFile, } impl StdFileResourceInner { pub fn file(fs_file: StdFile) -> Self { - StdFileResourceInner::File(fs_file) + StdFileResourceInner { + kind: StdFileResourceKind::File, + file: fs_file, + } } pub fn with_file(&mut self, f: impl FnOnce(&mut StdFile) -> R) -> R { - match self { - Self::File(file) - | Self::Stdin(file) - | Self::Stdout(file) - | Self::Stderr(file) => f(file), - } + f(&mut self.file) + } + + pub fn try_clone(&self) -> Result { + Ok(Self { + kind: self.kind, + file: self.file.try_clone()?, + }) } pub fn write_and_maybe_flush( @@ -343,19 +360,19 @@ impl StdFileResourceInner { // using the raw fds/handles, it will cause encoding issues on Windows // that we get solved for free by using Rust's stdio wrappers (see // std/src/sys/windows/stdio.rs in Rust's source code). - match self { - Self::File(file) => Ok(file.write(buf)?), - Self::Stdin(_) => { + match self.kind { + StdFileResourceKind::File => Ok(self.file.write(buf)?), + StdFileResourceKind::Stdin => { Err(Into::::into(ErrorKind::Unsupported).into()) } - Self::Stdout(_) => { + StdFileResourceKind::Stdout => { // bypass the file and use std::io::stdout() let mut stdout = std::io::stdout().lock(); let nwritten = stdout.write(buf)?; stdout.flush()?; Ok(nwritten) } - Self::Stderr(_) => { + StdFileResourceKind::Stderr => { // bypass the file and use std::io::stderr() let mut stderr = std::io::stderr().lock(); let nwritten = stderr.write(buf)?; @@ -371,19 +388,19 @@ impl StdFileResourceInner { ) -> Result<(), AnyError> { // this method exists instead of using a `Write` implementation // so that we can acquire the locks once and do both actions - match self { - Self::File(file) => Ok(file.write_all(buf)?), - Self::Stdin(_) => { + match self.kind { + StdFileResourceKind::File => Ok(self.file.write_all(buf)?), + StdFileResourceKind::Stdin => { Err(Into::::into(ErrorKind::Unsupported).into()) } - Self::Stdout(_) => { + StdFileResourceKind::Stdout => { // bypass the file and use std::io::stdout() let mut stdout = std::io::stdout().lock(); stdout.write_all(buf)?; stdout.flush()?; Ok(()) } - Self::Stderr(_) => { + StdFileResourceKind::Stderr => { // bypass the file and use std::io::stderr() let mut stderr = std::io::stderr().lock(); stderr.write_all(buf)?; @@ -396,32 +413,61 @@ impl StdFileResourceInner { impl Read for StdFileResourceInner { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - match self { - Self::File(file) | Self::Stdin(file) => file.read(buf), - Self::Stdout(_) | Self::Stderr(_) => Err(ErrorKind::Unsupported.into()), + match self.kind { + StdFileResourceKind::File | StdFileResourceKind::Stdin => { + self.file.read(buf) + } + StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => { + Err(ErrorKind::Unsupported.into()) + } } } } +struct StdFileResourceCellValue { + inner: StdFileResourceInner, + meta_data: Arc>, +} + +impl StdFileResourceCellValue { + pub fn try_clone(&self) -> Result { + Ok(Self { + inner: self.inner.try_clone()?, + meta_data: self.meta_data.clone(), + }) + } +} + pub struct StdFileResource { name: String, - cell: AsyncRefCell>, + // We can't use an AsyncRefCell here because we need to allow + // access to the resource synchronously at any time and + // asynchronously one at a time in order + cell: RefCell>, + // Used to keep async actions in order and only allow one + // to occurr at a time + cell_async_sempahore: Semaphore, } impl StdFileResource { fn stdio(inner: StdFileResourceInner, name: &str) -> Self { Self { - cell: AsyncRefCell::new(Some((inner, Default::default()))), + cell: RefCell::new(Some(StdFileResourceCellValue { + inner, + meta_data: Default::default(), + })), + cell_async_sempahore: Semaphore::new(1), name: name.to_string(), } } pub fn fs_file(fs_file: StdFile) -> Self { Self { - cell: AsyncRefCell::new(Some(( - StdFileResourceInner::file(fs_file), - Default::default(), - ))), + cell: RefCell::new(Some(StdFileResourceCellValue { + inner: StdFileResourceInner::file(fs_file), + meta_data: Default::default(), + })), + cell_async_sempahore: Semaphore::new(1), name: "fsFile".to_string(), } } @@ -430,17 +476,17 @@ impl StdFileResource { self: Rc, action: impl FnOnce( &mut StdFileResourceInner, - &mut FileMetadata, + &Arc>, ) -> Result, ) -> Result { - match RcRef::map(&self, |r| &r.cell).try_borrow_mut() { - Some(mut cell) => { + match self.cell.try_borrow_mut() { + Ok(mut cell) => { let mut file = cell.take().unwrap(); - let result = action(&mut file.0, &mut file.1); + let result = action(&mut file.inner, &file.meta_data); cell.replace(file); result } - None => Err(resource_unavailable()), + Err(_) => Err(resource_unavailable()), } } @@ -451,17 +497,33 @@ impl StdFileResource { where F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static, { + // we want to restrict this to one async action at a time + let _permit = self.cell_async_sempahore.acquire().await.unwrap(); // we take the value out of the cell, use it on a blocking task, // then put it back into the cell when we're done - let mut cell = RcRef::map(&self, |r| &r.cell).borrow_mut().await; - let mut file = cell.take().unwrap(); - let (file, result) = tokio::task::spawn_blocking(move || { - let result = action(&mut file.0); - (file, result) + let mut did_take = false; + let mut cell_value = { + let mut cell = self.cell.borrow_mut(); + match cell.as_mut().unwrap().try_clone() { + Ok(value) => value, + Err(_) => { + did_take = true; + cell.take().unwrap() + } + } + }; + let (cell_value, result) = tokio::task::spawn_blocking(move || { + let result = action(&mut cell_value.inner); + (cell_value, result) }) .await .unwrap(); - cell.replace(file); + + if did_take { + // put it back + self.cell.borrow_mut().replace(cell_value); + } + result } @@ -515,7 +577,7 @@ impl StdFileResource { f: F, ) -> Result where - F: FnOnce(&mut StdFile, &mut FileMetadata) -> Result, + F: FnOnce(&mut StdFile, &Arc>) -> Result, { Self::with_resource(state, rid, move |resource| { resource.with_inner_and_metadata(move |inner, metadata| { @@ -556,9 +618,9 @@ impl StdFileResource { rid: u32, ) -> Result { Self::with_resource(state, rid, |resource| { - resource.with_inner_and_metadata(|inner, _| match inner { - StdFileResourceInner::File(file) => { - let file = file.try_clone()?; + resource.with_inner_and_metadata(|inner, _| match inner.kind { + StdFileResourceKind::File => { + let file = inner.file.try_clone()?; Ok(file.into()) } _ => Ok(std::process::Stdio::inherit()), diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs index 2018f954db..2e6d1ea7c7 100644 --- a/runtime/ops/tty.rs +++ b/runtime/ops/tty.rs @@ -123,16 +123,18 @@ fn op_set_raw(state: &mut OpState, args: SetRawArgs) -> Result<(), AnyError> { rid, move |std_file, meta_data| { let raw_fd = std_file.as_raw_fd(); - let maybe_tty_mode = &mut meta_data.tty.mode; if is_raw { - if maybe_tty_mode.is_none() { - // Save original mode. - let original_mode = termios::tcgetattr(raw_fd)?; - maybe_tty_mode.replace(original_mode); - } - - let mut raw = maybe_tty_mode.clone().unwrap(); + let mut raw = { + let mut meta_data = meta_data.lock(); + let maybe_tty_mode = &mut meta_data.tty.mode; + if maybe_tty_mode.is_none() { + // Save original mode. + let original_mode = termios::tcgetattr(raw_fd)?; + maybe_tty_mode.replace(original_mode); + } + maybe_tty_mode.clone().unwrap() + }; raw.input_flags &= !(termios::InputFlags::BRKINT | termios::InputFlags::ICRNL @@ -155,7 +157,7 @@ fn op_set_raw(state: &mut OpState, args: SetRawArgs) -> Result<(), AnyError> { termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?; } else { // Try restore saved mode. - if let Some(mode) = maybe_tty_mode.take() { + if let Some(mode) = meta_data.lock().tty.mode.take() { termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?; } }