diff --git a/cli/tests/unit/net_test.ts b/cli/tests/unit/net_test.ts index b4a21578e2..930e4f0523 100644 --- a/cli/tests/unit/net_test.ts +++ b/cli/tests/unit/net_test.ts @@ -906,6 +906,55 @@ Deno.test({ listener.close(); }); +Deno.test( + { permissions: { net: true, read: true, run: true } }, + async function netConnUnref() { + const listener = Deno.listen({ port: 3500 }); + const intervalId = setInterval(() => {}); // This keeps event loop alive. + + const program = execCode(` + async function main() { + const conn = await Deno.connect({ port: 3500 }); + conn.unref(); + await conn.read(new Uint8Array(10)); // The program exits here + throw new Error(); // The program doesn't reach here + } + main(); + `); + const conn = await listener.accept(); + const [statusCode, _output] = await program; + conn.close(); + listener.close(); + clearInterval(intervalId); + assertEquals(statusCode, 0); + }, +); + +Deno.test( + { permissions: { net: true, read: true, run: true } }, + async function netConnUnrefReadable() { + const listener = Deno.listen({ port: 3500 }); + const intervalId = setInterval(() => {}); // This keeps event loop alive. + + const program = execCode(` + async function main() { + const conn = await Deno.connect({ port: 3500 }); + conn.unref(); + const reader = conn.readable.getReader(); + await reader.read(); // The program exits here + throw new Error(); // The program doesn't reach here + } + main(); + `); + const conn = await listener.accept(); + const [statusCode, _output] = await program; + conn.close(); + listener.close(); + clearInterval(intervalId); + assertEquals(statusCode, 0); + }, +); + Deno.test({ permissions: { net: true } }, async function netTcpReuseAddr() { const listener1 = Deno.listen({ hostname: "127.0.0.1", diff --git a/ext/net/01_net.js b/ext/net/01_net.js index f25904e80d..244b50a51c 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -4,8 +4,12 @@ ((window) => { const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype, ops } = core; - const { readableStreamForRid, writableStreamForRid } = - window.__bootstrap.streams; + const { + readableStreamForRidUnrefable, + readableStreamForRidUnrefableRef, + readableStreamForRidUnrefableUnref, + writableStreamForRid, + } = window.__bootstrap.streams; const { Error, ObjectPrototypeIsPrototypeOf, @@ -19,17 +23,6 @@ const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId"); - async function read( - rid, - buffer, - ) { - if (buffer.length === 0) { - return 0; - } - const nread = await core.read(rid, buffer); - return nread === 0 ? null : nread; - } - async function write(rid, data) { return await core.write(rid, data); } @@ -46,6 +39,8 @@ #rid = 0; #remoteAddr = null; #localAddr = null; + #unref = false; + #pendingReadPromiseIds = []; #readable; #writable; @@ -72,8 +67,25 @@ return write(this.rid, p); } - read(p) { - return read(this.rid, p); + async read(buffer) { + if (buffer.length === 0) { + return 0; + } + const promise = core.read(this.rid, buffer); + const promiseId = promise[promiseIdSymbol]; + if (this.#unref) core.unrefOp(promiseId); + this.#pendingReadPromiseIds.push(promiseId); + let nread; + try { + nread = await promise; + } catch (e) { + throw e; + } finally { + this.#pendingReadPromiseIds = this.#pendingReadPromiseIds.filter((id) => + id !== promiseId + ); + } + return nread === 0 ? null : nread; } close() { @@ -86,7 +98,10 @@ get readable() { if (this.#readable === undefined) { - this.#readable = readableStreamForRid(this.rid); + this.#readable = readableStreamForRidUnrefable(this.rid); + if (this.#unref) { + readableStreamForRidUnrefableUnref(this.#readable); + } } return this.#readable; } @@ -97,6 +112,22 @@ } return this.#writable; } + + ref() { + this.#unref = false; + if (this.#readable) { + readableStreamForRidUnrefableRef(this.#readable); + } + this.#pendingReadPromiseIds.forEach((id) => core.refOp(id)); + } + + unref() { + this.#unref = true; + if (this.#readable) { + readableStreamForRidUnrefableUnref(this.#readable); + } + this.#pendingReadPromiseIds.forEach((id) => core.unrefOp(id)); + } } class TcpConn extends Conn { diff --git a/ext/net/lib.deno_net.d.ts b/ext/net/lib.deno_net.d.ts index 639d0f8af0..136723262d 100644 --- a/ext/net/lib.deno_net.d.ts +++ b/ext/net/lib.deno_net.d.ts @@ -61,6 +61,20 @@ declare namespace Deno { * callers should just use `close()`. */ closeWrite(): Promise; + /** **UNSTABLE**: New API, yet to be vetted. + * + * Make the connection block the event loop from finishing. + * + * Note: the connection blocks the event loop from finishing by default. + * This method is only meaningful after `.unref()` is called. + */ + ref(): void; + /** **UNSTABLE**: New API, yet to be vetted. + * + * Make the connection not block the event loop from finishing. + */ + unref(): void; + readonly readable: ReadableStream; readonly writable: WritableStream; }