0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-10-29 08:58:01 -04:00

perf(node/net): optimize socket reads for 'npm:ws' package (#20449)

Fixes performance regression introduced by
https://github.com/denoland/deno/pull/20223 and
https://github.com/denoland/deno/pull/20314. It's enough to have one
"shared" buffer per socket
and no locking mechanism is required.
This commit is contained in:
Bartek Iwańczuk 2023-09-11 20:38:57 +02:00 committed by GitHub
parent 9d1385896f
commit aaff69db3f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -114,6 +114,7 @@ export class LibuvStreamWrap extends HandleWrap {
writeQueueSize = 0; writeQueueSize = 0;
bytesRead = 0; bytesRead = 0;
bytesWritten = 0; bytesWritten = 0;
#buf = new Uint8Array(SUGGESTED_SIZE);
onread!: (_arrayBuffer: Uint8Array, _nread: number) => Uint8Array | undefined; onread!: (_arrayBuffer: Uint8Array, _nread: number) => Uint8Array | undefined;
@ -311,88 +312,53 @@ export class LibuvStreamWrap extends HandleWrap {
/** Internal method for reading from the attached stream. */ /** Internal method for reading from the attached stream. */
async #read() { async #read() {
// Lock safety: We must hold this lock until we are certain that buf is no longer used let buf = this.#buf;
// This setup code is a little verbose, but we need to be careful about buffer management let nread: number | null;
let buf, locked = false; const ridBefore = this[kStreamBaseField]!.rid;
if (bufLocked) {
// Already locked, allocate
buf = new Uint8Array(SUGGESTED_SIZE);
} else {
// Not locked, take the buffer + lock
buf = BUF;
locked = bufLocked = true;
}
try { try {
let nread: number | null; nread = await this[kStreamBaseField]!.read(buf);
const ridBefore = this[kStreamBaseField]!.rid; } catch (e) {
try { // Try to read again if the underlying stream resource
nread = await this[kStreamBaseField]!.read(buf); // changed. This can happen during TLS upgrades (eg. STARTTLS)
} catch (e) { if (ridBefore != this[kStreamBaseField]!.rid) {
// Lock safety: we know that the buffer will not be used in this function again return this.#read();
// All exits from this block either return or re-assign buf to a different value
if (locked) {
bufLocked = locked = false;
}
// Try to read again if the underlying stream resource
// changed. This can happen during TLS upgrades (eg. STARTTLS)
if (ridBefore != this[kStreamBaseField]!.rid) {
return this.#read();
}
buf = new Uint8Array(0);
if (
e instanceof Deno.errors.Interrupted ||
e instanceof Deno.errors.BadResource
) {
nread = codeMap.get("EOF")!;
} else if (
e instanceof Deno.errors.ConnectionReset ||
e instanceof Deno.errors.ConnectionAborted
) {
nread = codeMap.get("ECONNRESET")!;
} else {
nread = codeMap.get("UNKNOWN")!;
}
} }
nread ??= codeMap.get("EOF")!; if (
e instanceof Deno.errors.Interrupted ||
streamBaseState[kReadBytesOrError] = nread; e instanceof Deno.errors.BadResource
) {
if (nread > 0) { nread = codeMap.get("EOF")!;
this.bytesRead += nread; } else if (
} e instanceof Deno.errors.ConnectionReset ||
e instanceof Deno.errors.ConnectionAborted
// We release the lock early so a re-entrant read can make use of the shared buffer, but ) {
// we need to make a copy of the data in the shared buffer. nread = codeMap.get("ECONNRESET")!;
if (locked) {
// Lock safety: we know that the buffer will not be used in this function again
// We're making a copy of data that lives in the shared buffer
buf = buf.slice(0, nread);
bufLocked = locked = false;
} else { } else {
// The buffer isn't owned, so let's create a subarray view nread = codeMap.get("UNKNOWN")!;
buf = buf.subarray(0, nread);
} }
}
streamBaseState[kArrayBufferOffset] = 0; nread ??= codeMap.get("EOF")!;
try { streamBaseState[kReadBytesOrError] = nread;
this.onread!(buf, nread);
} catch {
// swallow callback errors.
}
if (nread >= 0 && this.#reading) { if (nread > 0) {
this.#read(); this.bytesRead += nread;
} }
} finally {
// Lock safety: we know that the buffer will not be used in this function again buf = buf.slice(0, nread);
if (locked) {
bufLocked = locked = false; streamBaseState[kArrayBufferOffset] = 0;
}
try {
this.onread!(buf, nread);
} catch {
// swallow callback errors.
}
if (nread >= 0 && this.#reading) {
this.#read();
} }
} }
@ -454,8 +420,3 @@ export class LibuvStreamWrap extends HandleWrap {
return; return;
} }
} }
// Used in #read above
const BUF = new Uint8Array(SUGGESTED_SIZE);
// We need to ensure that only one inflight read request uses the cached buffer above
let bufLocked = false;