1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 16:42:21 -05:00

fix(ext/node): simultaneous reads can leak into each other (#20223)

Reported in #20188

This was caused by re-use of a global buffer `BUF` during simultaneous
async reads.
This commit is contained in:
Matt Mastracci 2023-08-22 08:45:10 -06:00
parent ba9892a213
commit 3e9fb8aafd
2 changed files with 107 additions and 42 deletions

View file

@ -130,3 +130,60 @@ Deno.test("[node/net] connection event has socket value", async () => {
await Promise.all([p, p2]); await Promise.all([p, p2]);
}); });
// https://github.com/denoland/deno/issues/20188
Deno.test("[node/net] multiple Sockets should get correct server data", async () => {
const p = deferred();
const p2 = deferred();
const dataReceived1 = deferred();
const dataReceived2 = deferred();
const events1: string[] = [];
const events2: string[] = [];
const server = net.createServer();
server.on("connection", (socket) => {
assert(socket !== undefined);
socket.on("data", (data) => {
socket.write(new TextDecoder().decode(data));
});
});
server.listen(async () => {
// deno-lint-ignore no-explicit-any
const { port } = server.address() as any;
const socket1 = net.createConnection(port);
const socket2 = net.createConnection(port);
socket1.on("data", (data) => {
events1.push(new TextDecoder().decode(data));
dataReceived1.resolve();
});
socket2.on("data", (data) => {
events2.push(new TextDecoder().decode(data));
dataReceived2.resolve();
});
socket1.write("111");
socket2.write("222");
await Promise.all([dataReceived1, dataReceived2]);
socket1.end();
socket2.end();
server.close(() => {
p.resolve();
});
p2.resolve();
});
await Promise.all([p, p2]);
assertEquals(events1, ["111"]);
assertEquals(events2, ["222"]);
});

View file

@ -311,8 +311,10 @@ export class LibuvStreamWrap extends HandleWrap {
/** Internal method for reading from the attached stream. */ /** Internal method for reading from the attached stream. */
async #read() { async #read() {
let buf = BUF; const isOwnedBuf = bufLocked;
let buf = bufLocked ? new Uint8Array(SUGGESTED_SIZE) : BUF;
bufLocked = true;
try {
let nread: number | null; let nread: number | null;
const ridBefore = this[kStreamBaseField]!.rid; const ridBefore = this[kStreamBaseField]!.rid;
try { try {
@ -349,7 +351,7 @@ export class LibuvStreamWrap extends HandleWrap {
this.bytesRead += nread; this.bytesRead += nread;
} }
buf = buf.slice(0, nread); buf = isOwnedBuf ? buf.subarray(0, nread) : buf.slice(0, nread);
streamBaseState[kArrayBufferOffset] = 0; streamBaseState[kArrayBufferOffset] = 0;
@ -362,6 +364,9 @@ export class LibuvStreamWrap extends HandleWrap {
if (nread >= 0 && this.#reading) { if (nread >= 0 && this.#reading) {
this.#read(); this.#read();
} }
} finally {
bufLocked = false;
}
} }
/** /**
@ -423,4 +428,7 @@ export class LibuvStreamWrap extends HandleWrap {
} }
} }
// Used in #read above
const BUF = new Uint8Array(SUGGESTED_SIZE); const BUF = new Uint8Array(SUGGESTED_SIZE);
// We need to ensure that only one inflight read request uses the cached buffer above
let bufLocked = false;