1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

fix(node): child_process IPC on Windows (#21597)

This PR implements the child_process IPC pipe between parent and child.
The implementation uses Windows named pipes created by parent and passes
the inheritable file handle to the child.

I've also replace parts of the initial implementation which passed the
raw parent fd to JS with resource ids instead. This way no file handle
is exposed to the JS land (both parent and child).

`IpcJsonStreamResource` can stream upto 800MB/s of JSON data on Win 11
AMD Ryzen 7 16GB (without `memchr` vectorization)
This commit is contained in:
Divy Srivastava 2023-12-19 18:07:22 +05:30 committed by GitHub
parent aefa205f63
commit 55fac9f5ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 258 additions and 93 deletions

2
Cargo.lock generated
View file

@ -1543,6 +1543,7 @@ dependencies = [
"typenum",
"url",
"winapi",
"windows-sys 0.48.0",
"x25519-dalek",
"x509-parser",
]
@ -1635,6 +1636,7 @@ dependencies = [
"uuid",
"which",
"winapi",
"windows-sys 0.48.0",
"winres",
]

View file

@ -917,12 +917,12 @@ impl CliOptions {
.map(Some)
}
pub fn node_ipc_fd(&self) -> Option<i32> {
pub fn node_ipc_fd(&self) -> Option<i64> {
let maybe_node_channel_fd = std::env::var("DENO_CHANNEL_FD").ok();
if let Some(node_channel_fd) = maybe_node_channel_fd {
// Remove so that child processes don't inherit this environment variable.
std::env::remove_var("DENO_CHANNEL_FD");
node_channel_fd.parse::<i32>().ok()
node_channel_fd.parse::<i64>().ok()
} else {
None
}

View file

@ -124,7 +124,7 @@ struct SharedWorkerState {
maybe_inspector_server: Option<Arc<InspectorServer>>,
maybe_lockfile: Option<Arc<Mutex<Lockfile>>>,
feature_checker: Arc<FeatureChecker>,
node_ipc: Option<i32>,
node_ipc: Option<i64>,
}
impl SharedWorkerState {
@ -404,7 +404,7 @@ impl CliMainWorkerFactory {
maybe_lockfile: Option<Arc<Mutex<Lockfile>>>,
feature_checker: Arc<FeatureChecker>,
options: CliMainWorkerOptions,
node_ipc: Option<i32>,
node_ipc: Option<i64>,
) -> Self {
Self {
shared: Arc::new(SharedWorkerState {

View file

@ -74,3 +74,7 @@ url.workspace = true
winapi.workspace = true
x25519-dalek = "2.0.0"
x509-parser = "0.15.0"
[target.'cfg(windows)'.dependencies]
windows-sys.workspace = true
winapi = { workspace = true, features = ["consoleapi"] }

View file

@ -31,6 +31,7 @@ mod polyfill;
mod resolution;
pub use ops::ipc::ChildPipeFd;
pub use ops::ipc::IpcJsonStreamResource;
pub use ops::v8::VM_CONTEXT_INDEX;
pub use package_json::PackageJson;
pub use path::PathClean;
@ -307,7 +308,6 @@ deno_core::extension!(deno_node,
ops::require::op_require_break_on_next_statement,
ops::util::op_node_guess_handle_type,
ops::crypto::op_node_create_private_key,
ops::ipc::op_node_ipc_pipe,
ops::ipc::op_node_child_ipc_pipe,
ops::ipc::op_node_ipc_write,
ops::ipc::op_node_ipc_read,

View file

@ -1,20 +1,17 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
#[cfg(unix)]
pub use unix::*;
pub use impl_::*;
#[cfg(windows)]
pub use windows::*;
pub struct ChildPipeFd(pub i64);
pub struct ChildPipeFd(pub i32);
#[cfg(unix)]
mod unix {
mod impl_ {
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::mem;
#[cfg(unix)]
use std::os::fd::FromRawFd;
#[cfg(unix)]
use std::os::fd::RawFd;
use std::pin::Pin;
use std::rc::Rc;
@ -35,18 +32,16 @@ mod unix {
use tokio::io::AsyncBufRead;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
#[cfg(unix)]
use tokio::net::unix::OwnedReadHalf;
#[cfg(unix)]
use tokio::net::unix::OwnedWriteHalf;
#[cfg(unix)]
use tokio::net::UnixStream;
#[op2(fast)]
#[smi]
pub fn op_node_ipc_pipe(
state: &mut OpState,
#[smi] fd: i32,
) -> Result<ResourceId, AnyError> {
Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?))
}
#[cfg(windows)]
type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient;
// Open IPC pipe from bootstrap options.
#[op2]
@ -97,9 +92,12 @@ mod unix {
Ok(msgs)
}
struct IpcJsonStreamResource {
pub struct IpcJsonStreamResource {
read_half: AsyncRefCell<IpcJsonStream>,
#[cfg(unix)]
write_half: AsyncRefCell<OwnedWriteHalf>,
#[cfg(windows)]
write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>,
cancel: Rc<CancelHandle>,
}
@ -109,14 +107,35 @@ mod unix {
}
}
#[cfg(unix)]
fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> {
// Safety: The fd is part of a pair of connected sockets create by child process
// implementation.
let unix_stream = UnixStream::from_std(unsafe {
std::os::unix::net::UnixStream::from_raw_fd(stream)
})?;
Ok(unix_stream.into_split())
}
#[cfg(windows)]
fn pipe(
handle: i64,
) -> Result<
(
tokio::io::ReadHalf<NamedPipeClient>,
tokio::io::WriteHalf<NamedPipeClient>,
),
io::Error,
> {
// Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the
// fd handle map will be the same.
let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? };
Ok(tokio::io::split(pipe))
}
impl IpcJsonStreamResource {
fn new(stream: RawFd) -> Result<Self, std::io::Error> {
// Safety: The fd is part of a pair of connected sockets create by child process
// implementation.
let unix_stream = UnixStream::from_std(unsafe {
std::os::unix::net::UnixStream::from_raw_fd(stream)
})?;
let (read_half, write_half) = unix_stream.into_split();
pub fn new(stream: i64) -> Result<Self, std::io::Error> {
let (read_half, write_half) = pipe(stream as _)?;
Ok(Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
@ -124,8 +143,9 @@ mod unix {
})
}
#[cfg(unix)]
#[cfg(test)]
fn from_unix_stream(stream: UnixStream) -> Self {
fn from_stream(stream: UnixStream) -> Self {
let (read_half, write_half) = stream.into_split();
Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
@ -134,6 +154,17 @@ mod unix {
}
}
#[cfg(windows)]
#[cfg(test)]
fn from_stream(pipe: NamedPipeClient) -> Self {
let (read_half, write_half) = tokio::io::split(pipe);
Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
cancel: Default::default(),
}
}
async fn write_msg(
self: Rc<Self>,
msg: serde_json::Value,
@ -172,11 +203,15 @@ mod unix {
//
// `\n` is used as a delimiter between messages.
struct IpcJsonStream {
#[cfg(unix)]
pipe: BufReader<OwnedReadHalf>,
#[cfg(windows)]
pipe: BufReader<tokio::io::ReadHalf<NamedPipeClient>>,
buffer: Vec<u8>,
}
impl IpcJsonStream {
#[cfg(unix)]
fn new(pipe: OwnedReadHalf) -> Self {
Self {
pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
@ -184,6 +219,14 @@ mod unix {
}
}
#[cfg(windows)]
fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self {
Self {
pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
buffer: Vec::with_capacity(INITIAL_CAPACITY),
}
}
async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> {
let mut json = None;
let nread =
@ -252,7 +295,6 @@ mod unix {
std::task::Poll::Ready(t) => t?,
std::task::Poll::Pending => return std::task::Poll::Pending,
};
if let Some(i) = memchr(b'\n', available) {
if *read == 0 {
// Fast path: parse and put into the json slot directly.
@ -366,6 +408,35 @@ mod unix {
use deno_core::RcRef;
use std::rc::Rc;
#[cfg(unix)]
pub async fn pair() -> (Rc<IpcJsonStreamResource>, tokio::net::UnixStream) {
let (a, b) = tokio::net::UnixStream::pair().unwrap();
/* Similar to how ops would use the resource */
let a = Rc::new(IpcJsonStreamResource::from_stream(a));
(a, b)
}
#[cfg(windows)]
pub async fn pair() -> (
Rc<IpcJsonStreamResource>,
tokio::net::windows::named_pipe::NamedPipeServer,
) {
use tokio::net::windows::named_pipe::ClientOptions;
use tokio::net::windows::named_pipe::ServerOptions;
let name =
format!(r"\\.\pipe\deno-named-pipe-test-{}", rand::random::<u32>());
let server = ServerOptions::new().create(name.clone()).unwrap();
let client = ClientOptions::new().open(name).unwrap();
server.connect().await.unwrap();
/* Similar to how ops would use the resource */
let client = Rc::new(IpcJsonStreamResource::from_stream(client));
(client, server)
}
#[tokio::test]
async fn bench_ipc() -> Result<(), Box<dyn std::error::Error>> {
// A simple round trip benchmark for quick dev feedback.
@ -375,7 +446,7 @@ mod unix {
return Ok(());
}
let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
@ -389,8 +460,6 @@ mod unix {
Ok::<_, std::io::Error>(())
});
let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
let start = std::time::Instant::now();
let mut bytes = 0;
@ -416,21 +485,20 @@ mod unix {
#[tokio::test]
async fn unix_ipc_json() -> Result<(), Box<dyn std::error::Error>> {
let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let mut buf = [0u8; 1024];
let n = fd2.read(&mut buf).await?;
assert_eq!(&buf[..n], b"\"hello\"\n");
const EXPECTED: &[u8] = b"\"hello\"\n";
let mut buf = [0u8; EXPECTED.len()];
let n = fd2.read_exact(&mut buf).await?;
assert_eq!(&buf[..n], EXPECTED);
fd2.write_all(b"\"world\"\n").await?;
Ok::<_, std::io::Error>(())
});
/* Similar to how ops would use the resource */
let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
ipc.clone().write_msg(json!("hello")).await?;
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
@ -444,19 +512,19 @@ mod unix {
#[tokio::test]
async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> {
let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let mut buf = [0u8; 1024];
let n = fd2.read(&mut buf).await?;
assert_eq!(&buf[..n], b"\"hello\"\n\"world\"\n");
const EXPECTED: &[u8] = b"\"hello\"\n\"world\"\n";
let mut buf = [0u8; EXPECTED.len()];
let n = fd2.read_exact(&mut buf).await?;
assert_eq!(&buf[..n], EXPECTED);
fd2.write_all(b"\"foo\"\n\"bar\"\n").await?;
Ok::<_, std::io::Error>(())
});
let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
ipc.clone().write_msg(json!("hello")).await?;
ipc.clone().write_msg(json!("world")).await?;
@ -471,13 +539,12 @@ mod unix {
#[tokio::test]
async fn unix_ipc_json_invalid() -> Result<(), Box<dyn std::error::Error>> {
let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?;
Ok::<_, std::io::Error>(())
});
let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
let _err = ipc.read_msg().await.unwrap_err();
@ -499,30 +566,3 @@ mod unix {
}
}
}
#[cfg(windows)]
mod windows {
use deno_core::error::AnyError;
use deno_core::op2;
#[op2(fast)]
pub fn op_node_ipc_pipe() -> Result<(), AnyError> {
Err(deno_core::error::not_supported())
}
#[op2(fast)]
#[smi]
pub fn op_node_child_ipc_pipe() -> Result<i32, AnyError> {
Ok(-1)
}
#[op2(async)]
pub async fn op_node_ipc_write() -> Result<(), AnyError> {
Err(deno_core::error::not_supported())
}
#[op2(async)]
pub async fn op_node_ipc_read() -> Result<(), AnyError> {
Err(deno_core::error::not_supported())
}
}

View file

@ -45,7 +45,6 @@ import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs";
import process from "node:process";
const core = globalThis.__bootstrap.core;
const ops = core.ops;
export function mapValues<T, O>(
record: Readonly<Record<string, T>>,
@ -1069,9 +1068,7 @@ function toDenoArgs(args: string[]): string[] {
return denoArgs;
}
export function setupChannel(target, channel) {
const ipc = ops.op_node_ipc_pipe(channel);
export function setupChannel(target, ipc) {
async function readLoop() {
try {
while (true) {

View file

@ -122,6 +122,7 @@ which = "4.2.5"
fwdansi.workspace = true
winapi = { workspace = true, features = ["commapi", "knownfolders", "mswsock", "objbase", "psapi", "shlobj", "tlhelp32", "winbase", "winerror", "winuser", "winsock2"] }
ntapi = "0.4.0"
windows-sys.workspace = true
[target.'cfg(unix)'.dependencies]
nix.workspace = true

View file

@ -141,7 +141,6 @@ pub struct SpawnArgs {
uid: Option<u32>,
#[cfg(windows)]
windows_raw_arguments: bool,
#[cfg(unix)]
ipc: Option<i32>,
#[serde(flatten)]
@ -207,12 +206,7 @@ pub struct SpawnOutput {
stderr: Option<ToJsBuffer>,
}
type CreateCommand = (
std::process::Command,
// TODO(@littledivy): Ideally this would return Option<ResourceId> but we are dealing with file descriptors
// all the way until setupChannel which makes it easier to share code between parent and child fork.
Option<i32>,
);
type CreateCommand = (std::process::Command, Option<ResourceId>);
fn create_command(
state: &mut OpState,
@ -337,17 +331,144 @@ fn create_command(
});
/* One end returned to parent process (this) */
let pipe_fd = Some(fd1);
let pipe_rid = Some(
state
.resource_table
.add(deno_node::IpcJsonStreamResource::new(fd1 as _)?),
);
/* The other end passed to child process via DENO_CHANNEL_FD */
command.env("DENO_CHANNEL_FD", format!("{}", ipc));
return Ok((command, pipe_fd));
return Ok((command, pipe_rid));
}
Ok((command, None))
}
#[cfg(windows)]
// Safety: We setup a windows named pipe and pass one end to the child process.
unsafe {
use windows_sys::Win32::Foundation::CloseHandle;
use windows_sys::Win32::Foundation::DuplicateHandle;
use windows_sys::Win32::Foundation::DUPLICATE_SAME_ACCESS;
use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED;
use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED;
use windows_sys::Win32::Foundation::GENERIC_READ;
use windows_sys::Win32::Foundation::GENERIC_WRITE;
use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE;
use windows_sys::Win32::Security::SECURITY_ATTRIBUTES;
use windows_sys::Win32::Storage::FileSystem::CreateFileW;
use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE;
use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED;
use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING;
use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX;
use windows_sys::Win32::System::Pipes::ConnectNamedPipe;
use windows_sys::Win32::System::Pipes::CreateNamedPipeW;
use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE;
use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE;
use windows_sys::Win32::System::Threading::GetCurrentProcess;
use std::io;
use std::os::windows::ffi::OsStrExt;
use std::path::Path;
use std::ptr;
if let Some(ipc) = args.ipc {
if ipc < 0 {
return Ok((command, None));
}
let (path, hd1) = loop {
let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4());
let mut path = Path::new(&name)
.as_os_str()
.encode_wide()
.collect::<Vec<_>>();
path.push(0);
let hd1 = CreateNamedPipeW(
path.as_ptr(),
PIPE_ACCESS_DUPLEX
| FILE_FLAG_FIRST_PIPE_INSTANCE
| FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
1,
65536,
65536,
0,
std::ptr::null_mut(),
);
if hd1 == INVALID_HANDLE_VALUE {
let err = io::Error::last_os_error();
/* If the pipe name is already in use, try again. */
if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) {
continue;
}
return Err(err.into());
}
break (path, hd1);
};
/* Create child pipe handle. */
let s = SECURITY_ATTRIBUTES {
nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32,
lpSecurityDescriptor: ptr::null_mut(),
bInheritHandle: 1,
};
let mut hd2 = CreateFileW(
path.as_ptr(),
GENERIC_READ | GENERIC_WRITE,
0,
&s,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
0,
);
if hd2 == INVALID_HANDLE_VALUE {
return Err(io::Error::last_os_error().into());
}
// Will not block because we have create the pair.
if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) {
CloseHandle(hd2);
return Err(err.into());
}
}
// Duplicating the handle to allow the child process to use it.
if DuplicateHandle(
GetCurrentProcess(),
hd2,
GetCurrentProcess(),
&mut hd2,
0,
1,
DUPLICATE_SAME_ACCESS,
) == 0
{
return Err(std::io::Error::last_os_error().into());
}
/* One end returned to parent process (this) */
let pipe_fd = Some(
state
.resource_table
.add(deno_node::IpcJsonStreamResource::new(hd1 as i64)?),
);
/* The other end passed to child process via DENO_CHANNEL_FD */
command.env("DENO_CHANNEL_FD", format!("{}", hd2 as i64));
return Ok((command, pipe_fd));
}
}
#[cfg(not(unix))]
return Ok((command, None));
}
@ -360,13 +481,13 @@ struct Child {
stdin_rid: Option<ResourceId>,
stdout_rid: Option<ResourceId>,
stderr_rid: Option<ResourceId>,
pipe_fd: Option<i32>,
pipe_fd: Option<ResourceId>,
}
fn spawn_child(
state: &mut OpState,
command: std::process::Command,
pipe_fd: Option<i32>,
pipe_fd: Option<ResourceId>,
) -> Result<Child, AnyError> {
let mut command = tokio::process::Command::from(command);
// TODO(@crowlkats): allow detaching processes.
@ -459,8 +580,8 @@ fn op_spawn_child(
#[serde] args: SpawnArgs,
#[string] api_name: String,
) -> Result<Child, AnyError> {
let (command, pipe_fd) = create_command(state, args, &api_name)?;
spawn_child(state, command, pipe_fd)
let (command, pipe_rid) = create_command(state, args, &api_name)?;
spawn_child(state, command, pipe_rid)
}
#[op2(async)]

View file

@ -59,7 +59,7 @@ pub struct BootstrapOptions {
pub inspect: bool,
pub has_node_modules_dir: bool,
pub maybe_binary_npm_command_name: Option<String>,
pub node_ipc_fd: Option<i32>,
pub node_ipc_fd: Option<i64>,
}
impl Default for BootstrapOptions {