1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-11 10:07:54 -05:00
denoland-deno/ext/node/benchmarks/child_process_ipc.mjs
Nathan Whitaker cd59fc53a5
fix(node): Rework node:child_process IPC (#24763)
Fixes https://github.com/denoland/deno/issues/24756. Fixes
https://github.com/denoland/deno/issues/24796.

This also gets vitest working when using
[`--pool=forks`](https://vitest.dev/guide/improving-performance#pool)
(which is the default as of vitest 2.0). Ref
https://github.com/denoland/deno/issues/23882.

---

This PR resolves a handful of issues with child_process IPC. In
particular:

- We didn't support sending typed array views over IPC
- Opening an IPC channel resulted in the event loop never exiting
- Sending a `null` over IPC would terminate the channel
- There was some UB in the read implementation (transmuting an `&[u8]`
to `&mut [u8]`)
- The `send` method wasn't returning anything, so there was no way to
signal backpressure (this also resulted in the benchmark
`child_process_ipc.mjs` being misleading, as it tried to respect
backpressure. That gave node much worse results at larger message sizes,
and gave us much worse results at smaller message sizes).
- We weren't setting up the `channel` property on the `process` global
(or on the `ChildProcess` object), and also didn't have a way to
ref/unref the channel
- Calling `kill` multiple times (or disconnecting the channel, then
calling kill) would throw an error
- Node couldn't spawn a deno subprocess and communicate with it over IPC
2024-07-30 16:13:24 -07:00

74 lines
1.7 KiB
JavaScript

import { fork } from "node:child_process";
import process from "node:process";
import { setImmediate } from "node:timers";
if (process.env.CHILD) {
const len = +process.env.CHILD;
const msg = ".".repeat(len);
let waiting = false;
const send = () => {
while (
process.send(msg, undefined, undefined, (_e) => {
if (waiting) {
waiting = false;
setImmediate(send);
}
})
);
// Wait: backlog of unsent messages exceeds threshold
// once the message is sent, the callback will be called
// and we'll resume
waiting = true;
};
send();
} else {
function main(dur, len) {
const p = new Promise((resolve) => {
const start = performance.now();
const options = {
"stdio": ["inherit", "inherit", "inherit", "ipc"],
"env": { "CHILD": len.toString() },
};
const path = new URL("child_process_ipc.mjs", import.meta.url).pathname;
const child = fork(
path,
options,
);
let bytes = 0;
let total = 0;
child.on("message", (msg) => {
bytes += msg.length;
total += 1;
});
setTimeout(() => {
child.kill();
const end = performance.now();
const mb = bytes / 1024 / 1024;
const sec = (end - start) / 1000;
const mbps = mb / sec;
console.log(`${len} bytes: ${mbps.toFixed(2)} MB/s`);
console.log(`${total} messages`);
resolve();
}, dur * 1000);
});
return p;
}
const len = [
64,
256,
1024,
4096,
16384,
65536,
65536 << 4,
65536 << 6 - 1,
];
for (const l of len) {
await main(5, l);
}
}