1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-30 16:40:57 -05:00
denoland-deno/ext/io/winpipe.rs
Matt Mastracci d722de886b
fix(io): create_named_pipe parallelism (#22597)
Investigating https://github.com/denoland/deno/issues/22574

Unable to reproduce with a unit test, but assuming that it's a name
collision or create pipe/open pipe race, and adding some additional
diagnostics.
2024-02-26 21:21:14 -07:00

194 lines
6.7 KiB
Rust

// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use rand::thread_rng;
use rand::RngCore;
use std::io;
use std::os::windows::io::RawHandle;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::time::Duration;
use winapi::shared::minwindef::DWORD;
use winapi::um::errhandlingapi::GetLastError;
use winapi::um::fileapi::CreateFileA;
use winapi::um::fileapi::OPEN_EXISTING;
use winapi::um::handleapi::CloseHandle;
use winapi::um::handleapi::INVALID_HANDLE_VALUE;
use winapi::um::minwinbase::SECURITY_ATTRIBUTES;
use winapi::um::winbase::CreateNamedPipeA;
use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE;
use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
use winapi::um::winbase::PIPE_READMODE_BYTE;
use winapi::um::winbase::PIPE_TYPE_BYTE;
use winapi::um::winnt::GENERIC_READ;
use winapi::um::winnt::GENERIC_WRITE;
/// Create a pair of file descriptors for a named pipe with non-inheritable handles. We cannot use
/// the anonymous pipe from `os_pipe` because that does not support OVERLAPPED (aka async) I/O.
///
/// This is the same way that Rust and pretty much everyone else does it.
///
/// For more information, there is an interesting S.O. question that explains the history, as
/// well as offering a complex NTAPI solution if we decide to try to make these pipes truely
/// anonymous: https://stackoverflow.com/questions/60645/overlapped-i-o-on-anonymous-pipe
pub fn create_named_pipe() -> io::Result<(RawHandle, RawHandle)> {
// Silently retry up to 10 times.
for _ in 0..10 {
if let Ok(res) = create_named_pipe_inner() {
return Ok(res);
}
}
create_named_pipe_inner()
}
fn create_named_pipe_inner() -> io::Result<(RawHandle, RawHandle)> {
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
// Create an extremely-likely-unique pipe name from randomness, identity and a serial counter.
let pipe_name = format!(
r#"\\.\pipe\deno_pipe_{:x}.{:x}.{:x}\0"#,
thread_rng().next_u64(),
std::process::id(),
NEXT_ID.fetch_add(1, Ordering::SeqCst),
);
// Create security attributes to make the pipe handles non-inheritable
let mut security_attributes = SECURITY_ATTRIBUTES {
nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as DWORD,
lpSecurityDescriptor: std::ptr::null_mut(),
bInheritHandle: 0,
};
// SAFETY: Create the pipe server with non-inheritable handle
let server_handle = unsafe {
CreateNamedPipeA(
pipe_name.as_ptr() as *const i8,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
// Read and write bytes, not messages
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
// The maximum number of instances that can be created for this pipe.
1,
// 4kB buffer sizes
4096,
4096,
// "The default time-out value, in milliseconds, if the WaitNamedPipe function specifies NMPWAIT_USE_DEFAULT_WAIT.
// Each instance of a named pipe must specify the same value. A value of zero will result in a default time-out of
// 50 milliseconds."
0,
&mut security_attributes,
)
};
if server_handle == INVALID_HANDLE_VALUE {
// This should not happen, so we would like to get some better diagnostics here.
// SAFETY: Printing last error for diagnostics
unsafe {
eprintln!("*** Unexpected server pipe failure: {:x}", GetLastError());
}
return Err(io::Error::last_os_error());
}
// The pipe might not be ready yet in rare cases, so we loop for a bit
for i in 0..10 {
// SAFETY: Create the pipe client with non-inheritable handle
let client_handle = unsafe {
CreateFileA(
pipe_name.as_ptr() as *const i8,
GENERIC_READ | GENERIC_WRITE,
0,
&mut security_attributes,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
std::ptr::null_mut(),
)
};
// There is a very rare case where the pipe is not ready to open. If we get `ERROR_PATH_NOT_FOUND`,
// we spin and try again in 1-10ms.
if client_handle == INVALID_HANDLE_VALUE {
// SAFETY: Getting last error for diagnostics
let error = unsafe { GetLastError() };
if error == winapi::shared::winerror::ERROR_FILE_NOT_FOUND
|| error == winapi::shared::winerror::ERROR_PATH_NOT_FOUND
{
// Exponential backoff, but don't sleep longer than 10ms
eprintln!("*** Unexpected client pipe not found failure: {:x}", error);
std::thread::sleep(Duration::from_millis(10.min(2_u64.pow(i) + 1)));
continue;
}
// This should not happen, so we would like to get some better diagnostics here.
eprintln!("*** Unexpected client pipe failure: {:x}", error);
let err = io::Error::last_os_error();
// SAFETY: Close the handles if we failed
unsafe {
CloseHandle(server_handle);
}
return Err(err);
}
return Ok((server_handle, client_handle));
}
// We failed to open the pipe despite sleeping
Err(std::io::ErrorKind::NotFound.into())
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
use std::io::Read;
use std::io::Write;
use std::os::windows::io::FromRawHandle;
use std::sync::Arc;
use std::sync::Barrier;
#[test]
fn make_named_pipe() {
let (server, client) = create_named_pipe().unwrap();
// SAFETY: For testing
let mut server = unsafe { File::from_raw_handle(server) };
// SAFETY: For testing
let mut client = unsafe { File::from_raw_handle(client) };
// Write to the server and read from the client
server.write_all(b"hello").unwrap();
let mut buf: [u8; 5] = Default::default();
client.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"hello");
}
#[test]
fn make_many_named_pipes_serial() {
let mut handles = vec![];
for _ in 0..100 {
let (server, client) = create_named_pipe().unwrap();
// SAFETY: For testing
let server = unsafe { File::from_raw_handle(server) };
// SAFETY: For testing
let client = unsafe { File::from_raw_handle(client) };
handles.push((server, client))
}
}
#[test]
fn make_many_named_pipes_parallel() {
let mut handles = vec![];
let barrier = Arc::new(Barrier::new(50));
for _ in 0..50 {
let barrier = barrier.clone();
handles.push(std::thread::spawn(move || {
barrier.wait();
let (server, client) = create_named_pipe().unwrap();
// SAFETY: For testing
let server = unsafe { File::from_raw_handle(server) };
// SAFETY: For testing
let client = unsafe { File::from_raw_handle(client) };
std::thread::sleep(std::time::Duration::from_millis(100));
drop((server, client));
}));
}
for handle in handles.drain(..) {
handle.join().unwrap();
}
}
}