From 27579f6fcb51661524ded70145c7f2dd67000bc2 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Wed, 21 Feb 2024 18:00:57 -0700 Subject: [PATCH] chore(io): Add a cross-platform unidirectional pipe implementation (#22522) Currently useful for `deno test` and internal tests, but could potentially be exposed at a later time as a `Deno` API. --- Cargo.lock | 12 +- Cargo.toml | 4 +- ext/io/Cargo.toml | 4 + ext/io/lib.rs | 9 ++ ext/io/pipe.rs | 288 ++++++++++++++++++++++++++++++++++++++++++++++ ext/io/winpipe.rs | 115 ++++++++++++++++++ 6 files changed, 425 insertions(+), 7 deletions(-) create mode 100644 ext/io/pipe.rs create mode 100644 ext/io/winpipe.rs diff --git a/Cargo.lock b/Cargo.lock index 6a2eb08043..fa6866eaaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1497,6 +1497,8 @@ dependencies = [ "filetime", "fs3", "once_cell", + "os_pipe", + "rand", "tokio", "winapi", ] @@ -4253,12 +4255,12 @@ dependencies = [ [[package]] name = "os_pipe" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ae859aa07428ca9a929b936690f8b12dc5f11dd8c6992a18ca93919f28bc177" +checksum = "57119c3b893986491ec9aa85056780d3a0f3cf4da7cc09dd3650dbd6c6738fb9" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -6526,9 +6528,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.34.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 7f158a7763..b30f1bcdd1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,7 +129,7 @@ monch = "=0.5.0" notify = "=5.0.0" num-bigint = { version = "0.4", features = ["rand"] } once_cell = "1.17.1" -os_pipe = "=1.1.4" +os_pipe = { version = "=1.1.5", features = ["io_safety"] } p224 = { version = "0.13.0", features = ["ecdh"] } p256 = { version = "0.13.2", features = ["ecdh"] } p384 = { version = "0.13.0", features = ["ecdh"] } @@ -165,7 +165,7 @@ tar = "=0.4.40" tempfile = "3.4.0" termcolor = "1.1.3" thiserror = "1.0.40" -tokio = { version = "1.28.1", features = ["full"] } +tokio = { version = "1.36.0", features = ["full"] } tokio-metrics = { version = "0.3.0", features = ["rt"] } tokio-util = "0.7.4" tower-lsp = { version = "=0.20.0", features = ["proposed"] } diff --git a/ext/io/Cargo.toml b/ext/io/Cargo.toml index d87ba701b1..5e664549aa 100644 --- a/ext/io/Cargo.toml +++ b/ext/io/Cargo.toml @@ -21,5 +21,9 @@ fs3.workspace = true once_cell.workspace = true tokio.workspace = true +[target.'cfg(not(windows))'.dependencies] +os_pipe.workspace = true + [target.'cfg(windows)'.dependencies] winapi = { workspace = true, features = ["winbase", "processenv"] } +rand.workspace = true diff --git a/ext/io/lib.rs b/ext/io/lib.rs index c85696f646..e0d649e0a4 100644 --- a/ext/io/lib.rs +++ b/ext/io/lib.rs @@ -50,6 +50,15 @@ use winapi::um::processenv::GetStdHandle; use winapi::um::winbase; pub mod fs; +mod pipe; +#[cfg(windows)] +mod winpipe; + +pub use pipe::pipe; +pub use pipe::AsyncPipeRead; +pub use pipe::AsyncPipeWrite; +pub use pipe::PipeRead; +pub use pipe::PipeWrite; // Store the stdio fd/handles in global statics in order to keep them // alive for the duration of the application since the last handle/fd diff --git a/ext/io/pipe.rs b/ext/io/pipe.rs new file mode 100644 index 0000000000..0cad7b1f66 --- /dev/null +++ b/ext/io/pipe.rs @@ -0,0 +1,288 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. +use std::io; +use std::pin::Pin; + +// The synchronous read end of a unidirectional pipe. +pub struct PipeRead { + file: std::fs::File, +} + +// The asynchronous read end of a unidirectional pipe. +pub struct AsyncPipeRead { + #[cfg(windows)] + /// We use a `ChildStdout` here as it's a much better fit for a Windows named pipe on Windows. We + /// might also be able to use `tokio::net::windows::named_pipe::NamedPipeClient` in the future + /// if those can be created from raw handles down the road. + read: tokio::process::ChildStdout, + #[cfg(not(windows))] + read: tokio::net::unix::pipe::Receiver, +} + +// The synchronous write end of a unidirectional pipe. +pub struct PipeWrite { + file: std::fs::File, +} + +// The asynchronous write end of a unidirectional pipe. +pub struct AsyncPipeWrite { + #[cfg(windows)] + /// We use a `ChildStdin` here as it's a much better fit for a Windows named pipe on Windows. We + /// might also be able to use `tokio::net::windows::named_pipe::NamedPipeClient` in the future + /// if those can be created from raw handles down the road. + write: tokio::process::ChildStdin, + #[cfg(not(windows))] + write: tokio::net::unix::pipe::Sender, +} + +impl PipeRead { + #[cfg(windows)] + pub fn into_async(self) -> AsyncPipeRead { + let owned: std::os::windows::io::OwnedHandle = self.file.into(); + let stdout = std::process::ChildStdout::from(owned); + AsyncPipeRead { + read: tokio::process::ChildStdout::from_std(stdout).unwrap(), + } + } + #[cfg(not(windows))] + pub fn into_async(self) -> AsyncPipeRead { + AsyncPipeRead { + read: tokio::net::unix::pipe::Receiver::from_file(self.file).unwrap(), + } + } +} + +impl AsyncPipeRead { + #[cfg(windows)] + pub fn into_sync(self) -> PipeRead { + let owned = self.read.into_owned_handle().unwrap(); + PipeRead { file: owned.into() } + } + #[cfg(not(windows))] + pub fn into_sync(self) -> PipeRead { + let file = self.read.into_nonblocking_fd().unwrap().into(); + PipeRead { file } + } +} + +impl std::io::Read for PipeRead { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.file.read(buf) + } + + fn read_vectored( + &mut self, + bufs: &mut [io::IoSliceMut<'_>], + ) -> io::Result { + self.file.read_vectored(bufs) + } +} + +impl tokio::io::AsyncRead for AsyncPipeRead { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().read).poll_read(cx, buf) + } +} + +impl PipeWrite { + #[cfg(windows)] + pub fn into_async(self) -> AsyncPipeWrite { + let owned: std::os::windows::io::OwnedHandle = self.file.into(); + let stdin = std::process::ChildStdin::from(owned); + AsyncPipeWrite { + write: tokio::process::ChildStdin::from_std(stdin).unwrap(), + } + } + #[cfg(not(windows))] + pub fn into_async(self) -> AsyncPipeWrite { + AsyncPipeWrite { + write: tokio::net::unix::pipe::Sender::from_file(self.file).unwrap(), + } + } +} + +impl AsyncPipeWrite { + #[cfg(windows)] + pub fn into_sync(self) -> PipeWrite { + let owned = self.write.into_owned_handle().unwrap(); + PipeWrite { file: owned.into() } + } + #[cfg(not(windows))] + pub fn into_sync(self) -> PipeWrite { + let file = self.write.into_nonblocking_fd().unwrap().into(); + PipeWrite { file } + } +} + +impl std::io::Write for PipeWrite { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.file.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.file.flush() + } + + fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { + self.file.write_vectored(bufs) + } +} + +impl tokio::io::AsyncWrite for AsyncPipeWrite { + #[inline(always)] + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().write).poll_write(cx, buf) + } + + #[inline(always)] + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().write).poll_flush(cx) + } + + #[inline(always)] + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().write).poll_shutdown(cx) + } + + #[inline(always)] + fn is_write_vectored(&self) -> bool { + self.write.is_write_vectored() + } + + #[inline(always)] + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().write).poll_write_vectored(cx, bufs) + } +} + +/// Create a unidirectional pipe pair that starts off as a pair of synchronous file handles, +/// but either side may be promoted to an async-capable reader/writer. +/// +/// On Windows, we use a named pipe because that's the only way to get reliable async I/O +/// support. On Unix platforms, we use the `os_pipe` library, which uses `pipe2` under the hood +/// (or `pipe` on OSX). +pub fn pipe() -> io::Result<(PipeRead, PipeWrite)> { + pipe_impl() +} + +/// Creates a unidirectional pipe on top of a named pipe (which is technically bidirectional). +#[cfg(windows)] +pub fn pipe_impl() -> io::Result<(PipeRead, PipeWrite)> { + // SAFETY: We're careful with handles here + unsafe { + use std::os::windows::io::FromRawHandle; + use std::os::windows::io::OwnedHandle; + let (server, client) = crate::winpipe::create_named_pipe()?; + let read = std::fs::File::from(OwnedHandle::from_raw_handle(client)); + let write = std::fs::File::from(OwnedHandle::from_raw_handle(server)); + Ok((PipeRead { file: read }, PipeWrite { file: write })) + } +} + +/// Creates a unidirectional pipe for unix platforms. +#[cfg(not(windows))] +pub fn pipe_impl() -> io::Result<(PipeRead, PipeWrite)> { + use std::os::unix::io::OwnedFd; + let (read, write) = os_pipe::pipe()?; + let read = std::fs::File::from(Into::::into(read)); + let write = std::fs::File::from(Into::::into(write)); + Ok((PipeRead { file: read }, PipeWrite { file: write })) +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::io::Read; + use std::io::Write; + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + #[test] + fn test_pipe() { + let (mut read, mut write) = pipe().unwrap(); + // Write to the server and read from the client + write.write_all(b"hello").unwrap(); + let mut buf: [u8; 5] = Default::default(); + read.read_exact(&mut buf).unwrap(); + assert_eq!(&buf, b"hello"); + } + + #[tokio::test] + async fn test_async_pipe() { + let (read, write) = pipe().unwrap(); + let mut read = read.into_async(); + let mut write = write.into_async(); + + write.write_all(b"hello").await.unwrap(); + let mut buf: [u8; 5] = Default::default(); + read.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"hello"); + } + + /// Test a round-trip through async mode and back. + #[tokio::test] + async fn test_pipe_transmute() { + let (mut read, mut write) = pipe().unwrap(); + + // Sync + write.write_all(b"hello").unwrap(); + let mut buf: [u8; 5] = Default::default(); + read.read_exact(&mut buf).unwrap(); + assert_eq!(&buf, b"hello"); + + let mut read = read.into_async(); + let mut write = write.into_async(); + + // Async + write.write_all(b"hello").await.unwrap(); + let mut buf: [u8; 5] = Default::default(); + read.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"hello"); + + let mut read = read.into_sync(); + let mut write = write.into_sync(); + + // Sync + write.write_all(b"hello").unwrap(); + let mut buf: [u8; 5] = Default::default(); + read.read_exact(&mut buf).unwrap(); + assert_eq!(&buf, b"hello"); + } + + #[tokio::test] + async fn test_async_pipe_is_nonblocking() { + let (read, write) = pipe().unwrap(); + let mut read = read.into_async(); + let mut write = write.into_async(); + + let a = tokio::spawn(async move { + let mut buf: [u8; 5] = Default::default(); + read.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"hello"); + }); + let b = tokio::spawn(async move { + write.write_all(b"hello").await.unwrap(); + }); + + a.await.unwrap(); + b.await.unwrap(); + } +} diff --git a/ext/io/winpipe.rs b/ext/io/winpipe.rs new file mode 100644 index 0000000000..01272300d1 --- /dev/null +++ b/ext/io/winpipe.rs @@ -0,0 +1,115 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. +use rand::thread_rng; +use rand::RngCore; +use std::io; +use std::os::windows::io::RawHandle; +use winapi::shared::minwindef::DWORD; +use winapi::um::fileapi::CreateFileA; +use winapi::um::fileapi::OPEN_EXISTING; +use winapi::um::handleapi::CloseHandle; +use winapi::um::handleapi::INVALID_HANDLE_VALUE; +use winapi::um::minwinbase::SECURITY_ATTRIBUTES; +use winapi::um::winbase::CreateNamedPipeA; +use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE; +use winapi::um::winbase::FILE_FLAG_OVERLAPPED; +use winapi::um::winbase::PIPE_ACCESS_DUPLEX; +use winapi::um::winbase::PIPE_READMODE_BYTE; +use winapi::um::winbase::PIPE_TYPE_BYTE; +use winapi::um::winnt::FILE_ATTRIBUTE_NORMAL; +use winapi::um::winnt::GENERIC_READ; +use winapi::um::winnt::GENERIC_WRITE; + +/// Create a pair of file descriptors for a named pipe with non-inheritable handles. We cannot use +/// the anonymous pipe from `os_pipe` because that does not support OVERLAPPED (aka async) I/O. +/// +/// This is the same way that Rust and pretty much everyone else does it. +/// +/// For more information, there is an interesting S.O. question that explains the history, as +/// well as offering a complex NTAPI solution if we decide to try to make these pipes truely +/// anonymous: https://stackoverflow.com/questions/60645/overlapped-i-o-on-anonymous-pipe +pub fn create_named_pipe() -> io::Result<(RawHandle, RawHandle)> { + let pipe_name = format!( + r#"\\.\pipe\deno_pipe_{:x}_{:x}\0"#, + std::process::id(), + thread_rng().next_u64() + ); + + // Create security attributes to make the pipe handles non-inheritable + let mut security_attributes = SECURITY_ATTRIBUTES { + nLength: std::mem::size_of::() as DWORD, + lpSecurityDescriptor: std::ptr::null_mut(), + bInheritHandle: 0, + }; + + // SAFETY: Create the pipe server with non-inheritable handle + let server_handle = unsafe { + CreateNamedPipeA( + pipe_name.as_ptr() as *const i8, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE, + // Read and write bytes, not messages + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, + // The maximum number of instances that can be created for this pipe. + 1, + // 4kB buffer sizes + 4096, + 4096, + // "The default time-out value, in milliseconds, if the WaitNamedPipe function specifies NMPWAIT_USE_DEFAULT_WAIT. + // Each instance of a named pipe must specify the same value. A value of zero will result in a default time-out of + // 50 milliseconds." + 0, + &mut security_attributes, + ) + }; + + if server_handle == INVALID_HANDLE_VALUE { + return Err(io::Error::last_os_error()); + } + + // SAFETY: Create the pipe client with non-inheritable handle + let client_handle = unsafe { + CreateFileA( + pipe_name.as_ptr() as *const i8, + GENERIC_READ | GENERIC_WRITE | FILE_FLAG_OVERLAPPED, + 0, + &mut security_attributes, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, + std::ptr::null_mut(), + ) + }; + + if client_handle == INVALID_HANDLE_VALUE { + let err = io::Error::last_os_error(); + // SAFETY: Close the handles if we failed + unsafe { + CloseHandle(server_handle); + } + return Err(err); + } + + Ok((server_handle, client_handle)) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs::File; + use std::io::Read; + use std::io::Write; + use std::os::windows::io::FromRawHandle; + + #[test] + fn make_named_pipe() { + let (server, client) = create_named_pipe().unwrap(); + // SAFETY: For testing + let mut server = unsafe { File::from_raw_handle(server) }; + // SAFETY: For testing + let mut client = unsafe { File::from_raw_handle(client) }; + + // Write to the server and read from the client + server.write_all(b"hello").unwrap(); + let mut buf: [u8; 5] = Default::default(); + client.read_exact(&mut buf).unwrap(); + assert_eq!(&buf, b"hello"); + } +}