1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-08 07:08:27 -05:00
denoland-deno/ext/node/ops/ipc.rs
Divy Srivastava 55fac9f5ea
fix(node): child_process IPC on Windows (#21597)
This PR implements the child_process IPC pipe between parent and child.
The implementation uses Windows named pipes created by parent and passes
the inheritable file handle to the child.

I've also replace parts of the initial implementation which passed the
raw parent fd to JS with resource ids instead. This way no file handle
is exposed to the JS land (both parent and child).

`IpcJsonStreamResource` can stream upto 800MB/s of JSON data on Win 11
AMD Ryzen 7 16GB (without `memchr` vectorization)
2023-12-19 13:37:22 +01:00

568 lines
16 KiB
Rust

// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
pub use impl_::*;
pub struct ChildPipeFd(pub i64);
mod impl_ {
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::mem;
#[cfg(unix)]
use std::os::fd::FromRawFd;
#[cfg(unix)]
use std::os::fd::RawFd;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::serde_json;
use deno_core::AsyncRefCell;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::ResourceId;
use pin_project_lite::pin_project;
use tokio::io::AsyncBufRead;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
#[cfg(unix)]
use tokio::net::unix::OwnedReadHalf;
#[cfg(unix)]
use tokio::net::unix::OwnedWriteHalf;
#[cfg(unix)]
use tokio::net::UnixStream;
#[cfg(windows)]
type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient;
// Open IPC pipe from bootstrap options.
#[op2]
#[smi]
pub fn op_node_child_ipc_pipe(
state: &mut OpState,
) -> Result<Option<ResourceId>, AnyError> {
let fd = match state.try_borrow_mut::<crate::ChildPipeFd>() {
Some(child_pipe_fd) => child_pipe_fd.0,
None => return Ok(None),
};
Ok(Some(
state.resource_table.add(IpcJsonStreamResource::new(fd)?),
))
}
#[op2(async)]
pub async fn op_node_ipc_write(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[serde] value: serde_json::Value,
) -> Result<(), AnyError> {
let stream = state
.borrow()
.resource_table
.get::<IpcJsonStreamResource>(rid)
.map_err(|_| bad_resource_id())?;
stream.write_msg(value).await?;
Ok(())
}
#[op2(async)]
#[serde]
pub async fn op_node_ipc_read(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<serde_json::Value, AnyError> {
let stream = state
.borrow()
.resource_table
.get::<IpcJsonStreamResource>(rid)
.map_err(|_| bad_resource_id())?;
let cancel = stream.cancel.clone();
let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
let msgs = stream.read_msg().or_cancel(cancel).await??;
Ok(msgs)
}
pub struct IpcJsonStreamResource {
read_half: AsyncRefCell<IpcJsonStream>,
#[cfg(unix)]
write_half: AsyncRefCell<OwnedWriteHalf>,
#[cfg(windows)]
write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>,
cancel: Rc<CancelHandle>,
}
impl deno_core::Resource for IpcJsonStreamResource {
fn close(self: Rc<Self>) {
self.cancel.cancel();
}
}
#[cfg(unix)]
fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> {
// Safety: The fd is part of a pair of connected sockets create by child process
// implementation.
let unix_stream = UnixStream::from_std(unsafe {
std::os::unix::net::UnixStream::from_raw_fd(stream)
})?;
Ok(unix_stream.into_split())
}
#[cfg(windows)]
fn pipe(
handle: i64,
) -> Result<
(
tokio::io::ReadHalf<NamedPipeClient>,
tokio::io::WriteHalf<NamedPipeClient>,
),
io::Error,
> {
// Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the
// fd handle map will be the same.
let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? };
Ok(tokio::io::split(pipe))
}
impl IpcJsonStreamResource {
pub fn new(stream: i64) -> Result<Self, std::io::Error> {
let (read_half, write_half) = pipe(stream as _)?;
Ok(Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
cancel: Default::default(),
})
}
#[cfg(unix)]
#[cfg(test)]
fn from_stream(stream: UnixStream) -> Self {
let (read_half, write_half) = stream.into_split();
Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
cancel: Default::default(),
}
}
#[cfg(windows)]
#[cfg(test)]
fn from_stream(pipe: NamedPipeClient) -> Self {
let (read_half, write_half) = tokio::io::split(pipe);
Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
cancel: Default::default(),
}
}
async fn write_msg(
self: Rc<Self>,
msg: serde_json::Value,
) -> Result<(), AnyError> {
let mut write_half =
RcRef::map(self, |r| &r.write_half).borrow_mut().await;
// Perf note: We do not benefit from writev here because
// we are always allocating a buffer for serialization anyways.
let mut buf = Vec::new();
serde_json::to_writer(&mut buf, &msg)?;
buf.push(b'\n');
write_half.write_all(&buf).await?;
Ok(())
}
}
#[inline]
fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
// Safety: haystack of valid length. neon_memchr can handle unaligned
// data.
return unsafe { neon::neon_memchr(haystack, needle, haystack.len()) };
#[cfg(not(all(target_os = "macos", target_arch = "aarch64")))]
return haystack.iter().position(|&b| b == needle);
}
// Initial capacity of the buffered reader and the JSON backing buffer.
//
// This is a tradeoff between memory usage and performance on large messages.
//
// 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message.
const INITIAL_CAPACITY: usize = 1024 * 64;
// JSON serialization stream over IPC pipe.
//
// `\n` is used as a delimiter between messages.
struct IpcJsonStream {
#[cfg(unix)]
pipe: BufReader<OwnedReadHalf>,
#[cfg(windows)]
pipe: BufReader<tokio::io::ReadHalf<NamedPipeClient>>,
buffer: Vec<u8>,
}
impl IpcJsonStream {
#[cfg(unix)]
fn new(pipe: OwnedReadHalf) -> Self {
Self {
pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
buffer: Vec::with_capacity(INITIAL_CAPACITY),
}
}
#[cfg(windows)]
fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self {
Self {
pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
buffer: Vec::with_capacity(INITIAL_CAPACITY),
}
}
async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> {
let mut json = None;
let nread =
read_msg_inner(&mut self.pipe, &mut self.buffer, &mut json).await?;
if nread == 0 {
// EOF.
return Ok(serde_json::Value::Null);
}
let json = match json {
Some(v) => v,
None => {
// Took more than a single read and some buffering.
simd_json::from_slice(&mut self.buffer[..nread])?
}
};
// Safety: Same as `Vec::clear` but without the `drop_in_place` for
// each element (nop for u8). Capacity remains the same.
unsafe {
self.buffer.set_len(0);
}
Ok(json)
}
}
pin_project! {
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct ReadMsgInner<'a, R: ?Sized> {
reader: &'a mut R,
buf: &'a mut Vec<u8>,
json: &'a mut Option<serde_json::Value>,
// The number of bytes appended to buf. This can be less than buf.len() if
// the buffer was not empty when the operation was started.
read: usize,
}
}
fn read_msg_inner<'a, R>(
reader: &'a mut R,
buf: &'a mut Vec<u8>,
json: &'a mut Option<serde_json::Value>,
) -> ReadMsgInner<'a, R>
where
R: AsyncBufRead + ?Sized + Unpin,
{
ReadMsgInner {
reader,
buf,
json,
read: 0,
}
}
fn read_msg_internal<R: AsyncBufRead + ?Sized>(
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut Vec<u8>,
json: &mut Option<serde_json::Value>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
loop {
let (done, used) = {
let available = match reader.as_mut().poll_fill_buf(cx) {
std::task::Poll::Ready(t) => t?,
std::task::Poll::Pending => return std::task::Poll::Pending,
};
if let Some(i) = memchr(b'\n', available) {
if *read == 0 {
// Fast path: parse and put into the json slot directly.
//
// Safety: It is ok to overwrite the contents because
// we don't need to copy it into the buffer and the length will be reset.
let available = unsafe {
std::slice::from_raw_parts_mut(
available.as_ptr() as *mut u8,
available.len(),
)
};
json.replace(
simd_json::from_slice(&mut available[..i + 1])
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
);
} else {
// This is not the first read, so we have to copy the data
// to make it contiguous.
buf.extend_from_slice(&available[..=i]);
}
(true, i + 1)
} else {
buf.extend_from_slice(available);
(false, available.len())
}
};
reader.as_mut().consume(used);
*read += used;
if done || used == 0 {
return Poll::Ready(Ok(mem::replace(read, 0)));
}
}
}
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
read_msg_internal(Pin::new(*me.reader), cx, me.buf, me.json, me.read)
}
}
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
mod neon {
use std::arch::aarch64::*;
pub unsafe fn neon_memchr(
str: &[u8],
c: u8,
length: usize,
) -> Option<usize> {
let end = str.as_ptr().wrapping_add(length);
// Alignment handling
let mut ptr = str.as_ptr();
while ptr < end && (ptr as usize) & 0xF != 0 {
if *ptr == c {
return Some(ptr as usize - str.as_ptr() as usize);
}
ptr = ptr.wrapping_add(1);
}
let search_char = vdupq_n_u8(c);
while ptr.wrapping_add(16) <= end {
let chunk = vld1q_u8(ptr);
let comparison = vceqq_u8(chunk, search_char);
// Check first 64 bits
let result0 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 0);
if result0 != 0 {
return Some(
(ptr as usize - str.as_ptr() as usize)
+ result0.trailing_zeros() as usize / 8,
);
}
// Check second 64 bits
let result1 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 1);
if result1 != 0 {
return Some(
(ptr as usize - str.as_ptr() as usize)
+ 8
+ result1.trailing_zeros() as usize / 8,
);
}
ptr = ptr.wrapping_add(16);
}
// Handle remaining unaligned characters
while ptr < end {
if *ptr == c {
return Some(ptr as usize - str.as_ptr() as usize);
}
ptr = ptr.wrapping_add(1);
}
None
}
}
#[cfg(test)]
mod tests {
use super::IpcJsonStreamResource;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::RcRef;
use std::rc::Rc;
#[cfg(unix)]
pub async fn pair() -> (Rc<IpcJsonStreamResource>, tokio::net::UnixStream) {
let (a, b) = tokio::net::UnixStream::pair().unwrap();
/* Similar to how ops would use the resource */
let a = Rc::new(IpcJsonStreamResource::from_stream(a));
(a, b)
}
#[cfg(windows)]
pub async fn pair() -> (
Rc<IpcJsonStreamResource>,
tokio::net::windows::named_pipe::NamedPipeServer,
) {
use tokio::net::windows::named_pipe::ClientOptions;
use tokio::net::windows::named_pipe::ServerOptions;
let name =
format!(r"\\.\pipe\deno-named-pipe-test-{}", rand::random::<u32>());
let server = ServerOptions::new().create(name.clone()).unwrap();
let client = ClientOptions::new().open(name).unwrap();
server.connect().await.unwrap();
/* Similar to how ops would use the resource */
let client = Rc::new(IpcJsonStreamResource::from_stream(client));
(client, server)
}
#[tokio::test]
async fn bench_ipc() -> Result<(), Box<dyn std::error::Error>> {
// A simple round trip benchmark for quick dev feedback.
//
// Only ran when the env var is set.
if std::env::var_os("BENCH_IPC_DENO").is_none() {
return Ok(());
}
let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
let size = 1024 * 1024;
let stri = "x".repeat(size);
let data = format!("\"{}\"\n", stri);
for _ in 0..100 {
fd2.write_all(data.as_bytes()).await?;
}
Ok::<_, std::io::Error>(())
});
let start = std::time::Instant::now();
let mut bytes = 0;
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
loop {
let msgs = ipc.read_msg().await?;
if msgs == serde_json::Value::Null {
break;
}
bytes += msgs.as_str().unwrap().len();
if start.elapsed().as_secs() > 5 {
break;
}
}
let elapsed = start.elapsed();
let mb = bytes as f64 / 1024.0 / 1024.0;
println!("{} mb/s", mb / elapsed.as_secs_f64());
child.await??;
Ok(())
}
#[tokio::test]
async fn unix_ipc_json() -> Result<(), Box<dyn std::error::Error>> {
let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
const EXPECTED: &[u8] = b"\"hello\"\n";
let mut buf = [0u8; EXPECTED.len()];
let n = fd2.read_exact(&mut buf).await?;
assert_eq!(&buf[..n], EXPECTED);
fd2.write_all(b"\"world\"\n").await?;
Ok::<_, std::io::Error>(())
});
ipc.clone().write_msg(json!("hello")).await?;
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
let msgs = ipc.read_msg().await?;
assert_eq!(msgs, json!("world"));
child.await??;
Ok(())
}
#[tokio::test]
async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> {
let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
const EXPECTED: &[u8] = b"\"hello\"\n\"world\"\n";
let mut buf = [0u8; EXPECTED.len()];
let n = fd2.read_exact(&mut buf).await?;
assert_eq!(&buf[..n], EXPECTED);
fd2.write_all(b"\"foo\"\n\"bar\"\n").await?;
Ok::<_, std::io::Error>(())
});
ipc.clone().write_msg(json!("hello")).await?;
ipc.clone().write_msg(json!("world")).await?;
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
let msgs = ipc.read_msg().await?;
assert_eq!(msgs, json!("foo"));
child.await??;
Ok(())
}
#[tokio::test]
async fn unix_ipc_json_invalid() -> Result<(), Box<dyn std::error::Error>> {
let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?;
Ok::<_, std::io::Error>(())
});
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
let _err = ipc.read_msg().await.unwrap_err();
child.await??;
Ok(())
}
#[test]
fn memchr() {
let str = b"hello world";
assert_eq!(super::memchr(b'h', str), Some(0));
assert_eq!(super::memchr(b'w', str), Some(6));
assert_eq!(super::memchr(b'd', str), Some(10));
assert_eq!(super::memchr(b'x', str), None);
let empty = b"";
assert_eq!(super::memchr(b'\n', empty), None);
}
}
}