From d722de886b85093eeef08d1e9fd6f3193405762d Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Mon, 26 Feb 2024 21:21:14 -0700 Subject: [PATCH] fix(io): create_named_pipe parallelism (#22597) Investigating https://github.com/denoland/deno/issues/22574 Unable to reproduce with a unit test, but assuming that it's a name collision or create pipe/open pipe race, and adding some additional diagnostics. --- ext/io/Cargo.toml | 2 +- ext/io/winpipe.rs | 123 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 102 insertions(+), 23 deletions(-) diff --git a/ext/io/Cargo.toml b/ext/io/Cargo.toml index 1b7f5126f7..2fa4d0d87d 100644 --- a/ext/io/Cargo.toml +++ b/ext/io/Cargo.toml @@ -25,5 +25,5 @@ tokio.workspace = true os_pipe.workspace = true [target.'cfg(windows)'.dependencies] -winapi = { workspace = true, features = ["winbase", "processenv"] } +winapi = { workspace = true, features = ["winbase", "processenv", "errhandlingapi"] } rand.workspace = true diff --git a/ext/io/winpipe.rs b/ext/io/winpipe.rs index 01272300d1..1495cbed15 100644 --- a/ext/io/winpipe.rs +++ b/ext/io/winpipe.rs @@ -3,7 +3,11 @@ use rand::thread_rng; use rand::RngCore; use std::io; use std::os::windows::io::RawHandle; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering; +use std::time::Duration; use winapi::shared::minwindef::DWORD; +use winapi::um::errhandlingapi::GetLastError; use winapi::um::fileapi::CreateFileA; use winapi::um::fileapi::OPEN_EXISTING; use winapi::um::handleapi::CloseHandle; @@ -15,7 +19,6 @@ use winapi::um::winbase::FILE_FLAG_OVERLAPPED; use winapi::um::winbase::PIPE_ACCESS_DUPLEX; use winapi::um::winbase::PIPE_READMODE_BYTE; use winapi::um::winbase::PIPE_TYPE_BYTE; -use winapi::um::winnt::FILE_ATTRIBUTE_NORMAL; use winapi::um::winnt::GENERIC_READ; use winapi::um::winnt::GENERIC_WRITE; @@ -28,10 +31,23 @@ use winapi::um::winnt::GENERIC_WRITE; /// well as offering a complex NTAPI solution if we decide to try to make these pipes truely /// anonymous: https://stackoverflow.com/questions/60645/overlapped-i-o-on-anonymous-pipe pub fn create_named_pipe() -> io::Result<(RawHandle, RawHandle)> { + // Silently retry up to 10 times. + for _ in 0..10 { + if let Ok(res) = create_named_pipe_inner() { + return Ok(res); + } + } + create_named_pipe_inner() +} + +fn create_named_pipe_inner() -> io::Result<(RawHandle, RawHandle)> { + static NEXT_ID: AtomicU32 = AtomicU32::new(0); + // Create an extremely-likely-unique pipe name from randomness, identity and a serial counter. let pipe_name = format!( - r#"\\.\pipe\deno_pipe_{:x}_{:x}\0"#, + r#"\\.\pipe\deno_pipe_{:x}.{:x}.{:x}\0"#, + thread_rng().next_u64(), std::process::id(), - thread_rng().next_u64() + NEXT_ID.fetch_add(1, Ordering::SeqCst), ); // Create security attributes to make the pipe handles non-inheritable @@ -62,32 +78,58 @@ pub fn create_named_pipe() -> io::Result<(RawHandle, RawHandle)> { }; if server_handle == INVALID_HANDLE_VALUE { + // This should not happen, so we would like to get some better diagnostics here. + // SAFETY: Printing last error for diagnostics + unsafe { + eprintln!("*** Unexpected server pipe failure: {:x}", GetLastError()); + } return Err(io::Error::last_os_error()); } - // SAFETY: Create the pipe client with non-inheritable handle - let client_handle = unsafe { - CreateFileA( - pipe_name.as_ptr() as *const i8, - GENERIC_READ | GENERIC_WRITE | FILE_FLAG_OVERLAPPED, - 0, - &mut security_attributes, - OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, - std::ptr::null_mut(), - ) - }; + // The pipe might not be ready yet in rare cases, so we loop for a bit + for i in 0..10 { + // SAFETY: Create the pipe client with non-inheritable handle + let client_handle = unsafe { + CreateFileA( + pipe_name.as_ptr() as *const i8, + GENERIC_READ | GENERIC_WRITE, + 0, + &mut security_attributes, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + std::ptr::null_mut(), + ) + }; - if client_handle == INVALID_HANDLE_VALUE { - let err = io::Error::last_os_error(); - // SAFETY: Close the handles if we failed - unsafe { - CloseHandle(server_handle); + // There is a very rare case where the pipe is not ready to open. If we get `ERROR_PATH_NOT_FOUND`, + // we spin and try again in 1-10ms. + if client_handle == INVALID_HANDLE_VALUE { + // SAFETY: Getting last error for diagnostics + let error = unsafe { GetLastError() }; + if error == winapi::shared::winerror::ERROR_FILE_NOT_FOUND + || error == winapi::shared::winerror::ERROR_PATH_NOT_FOUND + { + // Exponential backoff, but don't sleep longer than 10ms + eprintln!("*** Unexpected client pipe not found failure: {:x}", error); + std::thread::sleep(Duration::from_millis(10.min(2_u64.pow(i) + 1))); + continue; + } + + // This should not happen, so we would like to get some better diagnostics here. + eprintln!("*** Unexpected client pipe failure: {:x}", error); + let err = io::Error::last_os_error(); + // SAFETY: Close the handles if we failed + unsafe { + CloseHandle(server_handle); + } + return Err(err); } - return Err(err); + + return Ok((server_handle, client_handle)); } - Ok((server_handle, client_handle)) + // We failed to open the pipe despite sleeping + Err(std::io::ErrorKind::NotFound.into()) } #[cfg(test)] @@ -97,6 +139,8 @@ mod tests { use std::io::Read; use std::io::Write; use std::os::windows::io::FromRawHandle; + use std::sync::Arc; + use std::sync::Barrier; #[test] fn make_named_pipe() { @@ -112,4 +156,39 @@ mod tests { client.read_exact(&mut buf).unwrap(); assert_eq!(&buf, b"hello"); } + + #[test] + fn make_many_named_pipes_serial() { + let mut handles = vec![]; + for _ in 0..100 { + let (server, client) = create_named_pipe().unwrap(); + // SAFETY: For testing + let server = unsafe { File::from_raw_handle(server) }; + // SAFETY: For testing + let client = unsafe { File::from_raw_handle(client) }; + handles.push((server, client)) + } + } + + #[test] + fn make_many_named_pipes_parallel() { + let mut handles = vec![]; + let barrier = Arc::new(Barrier::new(50)); + for _ in 0..50 { + let barrier = barrier.clone(); + handles.push(std::thread::spawn(move || { + barrier.wait(); + let (server, client) = create_named_pipe().unwrap(); + // SAFETY: For testing + let server = unsafe { File::from_raw_handle(server) }; + // SAFETY: For testing + let client = unsafe { File::from_raw_handle(client) }; + std::thread::sleep(std::time::Duration::from_millis(100)); + drop((server, client)); + })); + } + for handle in handles.drain(..) { + handle.join().unwrap(); + } + } }