mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
fix(ext/node): cancel pending ipc writes on channel close (#26504)
Fixes the issue described in https://github.com/denoland/deno/issues/23882#issuecomment-2423316362. The parent was starting to send a message right before the process would exit, and the channel closed in the middle of the write. Unlike with reads, we weren't cancelling the pending writes, which resulted in a `Broken pipe` error surfacing to the user.
This commit is contained in:
parent
7c57105cc4
commit
27df42f659
2 changed files with 23 additions and 5 deletions
|
@ -216,10 +216,17 @@ mod impl_ {
|
||||||
queue_ok.set_index(scope, 0, v);
|
queue_ok.set_index(scope, 0, v);
|
||||||
}
|
}
|
||||||
Ok(async move {
|
Ok(async move {
|
||||||
stream.clone().write_msg_bytes(&serialized).await?;
|
let cancel = stream.cancel.clone();
|
||||||
|
let result = stream
|
||||||
|
.clone()
|
||||||
|
.write_msg_bytes(&serialized)
|
||||||
|
.or_cancel(cancel)
|
||||||
|
.await;
|
||||||
|
// adjust count even on error
|
||||||
stream
|
stream
|
||||||
.queued_bytes
|
.queued_bytes
|
||||||
.fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed);
|
.fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed);
|
||||||
|
result??;
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -1339,7 +1339,7 @@ export function setupChannel(target: any, ipc: number) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
process.nextTick(handleMessage, msg);
|
nextTick(handleMessage, msg);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (
|
if (
|
||||||
|
@ -1400,7 +1400,7 @@ export function setupChannel(target: any, ipc: number) {
|
||||||
if (!target.connected) {
|
if (!target.connected) {
|
||||||
const err = new ERR_IPC_CHANNEL_CLOSED();
|
const err = new ERR_IPC_CHANNEL_CLOSED();
|
||||||
if (typeof callback === "function") {
|
if (typeof callback === "function") {
|
||||||
process.nextTick(callback, err);
|
nextTick(callback, err);
|
||||||
} else {
|
} else {
|
||||||
nextTick(() => target.emit("error", err));
|
nextTick(() => target.emit("error", err));
|
||||||
}
|
}
|
||||||
|
@ -1416,7 +1416,18 @@ export function setupChannel(target: any, ipc: number) {
|
||||||
.then(() => {
|
.then(() => {
|
||||||
control.unrefCounted();
|
control.unrefCounted();
|
||||||
if (callback) {
|
if (callback) {
|
||||||
process.nextTick(callback, null);
|
nextTick(callback, null);
|
||||||
|
}
|
||||||
|
}, (err: Error) => {
|
||||||
|
control.unrefCounted();
|
||||||
|
if (err instanceof Deno.errors.Interrupted) {
|
||||||
|
// Channel closed on us mid-write.
|
||||||
|
} else {
|
||||||
|
if (typeof callback === "function") {
|
||||||
|
nextTick(callback, err);
|
||||||
|
} else {
|
||||||
|
nextTick(() => target.emit("error", err));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return queueOk[0];
|
return queueOk[0];
|
||||||
|
@ -1433,7 +1444,7 @@ export function setupChannel(target: any, ipc: number) {
|
||||||
target.connected = false;
|
target.connected = false;
|
||||||
target[kCanDisconnect] = false;
|
target[kCanDisconnect] = false;
|
||||||
control[kControlDisconnect]();
|
control[kControlDisconnect]();
|
||||||
process.nextTick(() => {
|
nextTick(() => {
|
||||||
target.channel = null;
|
target.channel = null;
|
||||||
core.close(ipc);
|
core.close(ipc);
|
||||||
target.emit("disconnect");
|
target.emit("disconnect");
|
||||||
|
|
Loading…
Reference in a new issue