mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 16:42:21 -05:00
fix(ext/node): shared global buffer unlock correctness fix (#20314)
The fix for #20188 was not entirely correct -- we were unlocking the global buffer incorrectly. This PR introduces a lock state that ensures we only unlock a lock we have taken out.
This commit is contained in:
parent
9198bbd454
commit
7adaf613bf
2 changed files with 73 additions and 38 deletions
|
@ -5,7 +5,7 @@ import {
|
||||||
assert,
|
assert,
|
||||||
assertEquals,
|
assertEquals,
|
||||||
} from "../../../test_util/std/testing/asserts.ts";
|
} from "../../../test_util/std/testing/asserts.ts";
|
||||||
import { deferred } from "../../../test_util/std/async/deferred.ts";
|
import { Deferred, deferred } from "../../../test_util/std/async/deferred.ts";
|
||||||
import * as path from "../../../test_util/std/path/mod.ts";
|
import * as path from "../../../test_util/std/path/mod.ts";
|
||||||
import * as http from "node:http";
|
import * as http from "node:http";
|
||||||
|
|
||||||
|
@ -131,17 +131,18 @@ Deno.test("[node/net] connection event has socket value", async () => {
|
||||||
await Promise.all([p, p2]);
|
await Promise.all([p, p2]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/// We need to make sure that any shared buffers are never used concurrently by two reads.
|
||||||
// https://github.com/denoland/deno/issues/20188
|
// https://github.com/denoland/deno/issues/20188
|
||||||
Deno.test("[node/net] multiple Sockets should get correct server data", async () => {
|
Deno.test("[node/net] multiple Sockets should get correct server data", async () => {
|
||||||
const p = deferred();
|
const socketCount = 9;
|
||||||
const p2 = deferred();
|
|
||||||
|
|
||||||
const dataReceived1 = deferred();
|
class TestSocket {
|
||||||
const dataReceived2 = deferred();
|
dataReceived: Deferred<undefined> = deferred();
|
||||||
|
events: string[] = [];
|
||||||
const events1: string[] = [];
|
socket: net.Socket | undefined;
|
||||||
const events2: string[] = [];
|
}
|
||||||
|
|
||||||
|
const finished = deferred();
|
||||||
const server = net.createServer();
|
const server = net.createServer();
|
||||||
server.on("connection", (socket) => {
|
server.on("connection", (socket) => {
|
||||||
assert(socket !== undefined);
|
assert(socket !== undefined);
|
||||||
|
@ -150,40 +151,47 @@ Deno.test("[node/net] multiple Sockets should get correct server data", async ()
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const sockets: TestSocket[] = [];
|
||||||
|
for (let i = 0; i < socketCount; i++) {
|
||||||
|
sockets[i] = new TestSocket();
|
||||||
|
}
|
||||||
|
|
||||||
server.listen(async () => {
|
server.listen(async () => {
|
||||||
// deno-lint-ignore no-explicit-any
|
// deno-lint-ignore no-explicit-any
|
||||||
const { port } = server.address() as any;
|
const { port } = server.address() as any;
|
||||||
|
|
||||||
const socket1 = net.createConnection(port);
|
for (let i = 0; i < socketCount; i++) {
|
||||||
const socket2 = net.createConnection(port);
|
const socket = sockets[i].socket = net.createConnection(port);
|
||||||
|
socket.on("data", (data) => {
|
||||||
|
const count = sockets[i].events.length;
|
||||||
|
sockets[i].events.push(new TextDecoder().decode(data));
|
||||||
|
if (count === 0) {
|
||||||
|
// Trigger an immediate second write
|
||||||
|
sockets[i].socket?.write(`${i}`.repeat(3));
|
||||||
|
} else {
|
||||||
|
sockets[i].dataReceived.resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
socket1.on("data", (data) => {
|
for (let i = 0; i < socketCount; i++) {
|
||||||
events1.push(new TextDecoder().decode(data));
|
sockets[i].socket?.write(`${i}`.repeat(3));
|
||||||
dataReceived1.resolve();
|
}
|
||||||
});
|
|
||||||
|
|
||||||
socket2.on("data", (data) => {
|
await Promise.all(sockets.map((socket) => socket.dataReceived));
|
||||||
events2.push(new TextDecoder().decode(data));
|
|
||||||
dataReceived2.resolve();
|
|
||||||
});
|
|
||||||
|
|
||||||
socket1.write("111");
|
for (let i = 0; i < socketCount; i++) {
|
||||||
socket2.write("222");
|
sockets[i].socket?.end();
|
||||||
|
}
|
||||||
await Promise.all([dataReceived1, dataReceived2]);
|
|
||||||
|
|
||||||
socket1.end();
|
|
||||||
socket2.end();
|
|
||||||
|
|
||||||
server.close(() => {
|
server.close(() => {
|
||||||
p.resolve();
|
finished.resolve();
|
||||||
});
|
});
|
||||||
|
|
||||||
p2.resolve();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
await Promise.all([p, p2]);
|
await finished;
|
||||||
|
|
||||||
assertEquals(events1, ["111"]);
|
for (let i = 0; i < socketCount; i++) {
|
||||||
assertEquals(events2, ["222"]);
|
assertEquals(sockets[i].events, [`${i}`.repeat(3), `${i}`.repeat(3)]);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -311,21 +311,37 @@ export class LibuvStreamWrap extends HandleWrap {
|
||||||
|
|
||||||
/** Internal method for reading from the attached stream. */
|
/** Internal method for reading from the attached stream. */
|
||||||
async #read() {
|
async #read() {
|
||||||
const isOwnedBuf = bufLocked;
|
// Lock safety: We must hold this lock until we are certain that buf is no longer used
|
||||||
let buf = bufLocked ? new Uint8Array(SUGGESTED_SIZE) : BUF;
|
// This setup code is a little verbose, but we need to be careful about buffer management
|
||||||
bufLocked = true;
|
let buf, locked = false;
|
||||||
|
if (bufLocked) {
|
||||||
|
// Already locked, allocate
|
||||||
|
buf = new Uint8Array(SUGGESTED_SIZE);
|
||||||
|
} else {
|
||||||
|
// Not locked, take the buffer + lock
|
||||||
|
buf = BUF;
|
||||||
|
locked = bufLocked = true;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
let nread: number | null;
|
let nread: number | null;
|
||||||
const ridBefore = this[kStreamBaseField]!.rid;
|
const ridBefore = this[kStreamBaseField]!.rid;
|
||||||
try {
|
try {
|
||||||
nread = await this[kStreamBaseField]!.read(buf);
|
nread = await this[kStreamBaseField]!.read(buf);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
|
// Lock safety: we know that the buffer will not be used in this function again
|
||||||
|
// All exits from this block either return or re-assign buf to a different value
|
||||||
|
if (locked) {
|
||||||
|
bufLocked = locked = false;
|
||||||
|
}
|
||||||
|
|
||||||
// Try to read again if the underlying stream resource
|
// Try to read again if the underlying stream resource
|
||||||
// changed. This can happen during TLS upgrades (eg. STARTTLS)
|
// changed. This can happen during TLS upgrades (eg. STARTTLS)
|
||||||
if (ridBefore != this[kStreamBaseField]!.rid) {
|
if (ridBefore != this[kStreamBaseField]!.rid) {
|
||||||
return this.#read();
|
return this.#read();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
buf = new Uint8Array(0);
|
||||||
|
|
||||||
if (
|
if (
|
||||||
e instanceof Deno.errors.Interrupted ||
|
e instanceof Deno.errors.Interrupted ||
|
||||||
e instanceof Deno.errors.BadResource
|
e instanceof Deno.errors.BadResource
|
||||||
|
@ -339,8 +355,6 @@ export class LibuvStreamWrap extends HandleWrap {
|
||||||
} else {
|
} else {
|
||||||
nread = codeMap.get("UNKNOWN")!;
|
nread = codeMap.get("UNKNOWN")!;
|
||||||
}
|
}
|
||||||
|
|
||||||
buf = new Uint8Array(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
nread ??= codeMap.get("EOF")!;
|
nread ??= codeMap.get("EOF")!;
|
||||||
|
@ -351,7 +365,17 @@ export class LibuvStreamWrap extends HandleWrap {
|
||||||
this.bytesRead += nread;
|
this.bytesRead += nread;
|
||||||
}
|
}
|
||||||
|
|
||||||
buf = isOwnedBuf ? buf.subarray(0, nread) : buf.slice(0, nread);
|
// We release the lock early so a re-entrant read can make use of the shared buffer, but
|
||||||
|
// we need to make a copy of the data in the shared buffer.
|
||||||
|
if (locked) {
|
||||||
|
// Lock safety: we know that the buffer will not be used in this function again
|
||||||
|
// We're making a copy of data that lives in the shared buffer
|
||||||
|
buf = buf.slice(0, nread);
|
||||||
|
bufLocked = locked = false;
|
||||||
|
} else {
|
||||||
|
// The buffer isn't owned, so let's create a subarray view
|
||||||
|
buf = buf.subarray(0, nread);
|
||||||
|
}
|
||||||
|
|
||||||
streamBaseState[kArrayBufferOffset] = 0;
|
streamBaseState[kArrayBufferOffset] = 0;
|
||||||
|
|
||||||
|
@ -365,7 +389,10 @@ export class LibuvStreamWrap extends HandleWrap {
|
||||||
this.#read();
|
this.#read();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
bufLocked = false;
|
// Lock safety: we know that the buffer will not be used in this function again
|
||||||
|
if (locked) {
|
||||||
|
bufLocked = locked = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue