mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
feat(unstable): add ref/unref to Listener (#13961)
Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
parent
5c9844e5f7
commit
7feb25d448
6 changed files with 160 additions and 15 deletions
16
cli/dts/lib.deno.unstable.d.ts
vendored
16
cli/dts/lib.deno.unstable.d.ts
vendored
|
@ -1296,6 +1296,22 @@ declare namespace Deno {
|
|||
alpnProtocols?: string[];
|
||||
}
|
||||
|
||||
export interface Listener extends AsyncIterable<Conn> {
|
||||
/** **UNSTABLE**: new API, yet to be vetted.
|
||||
*
|
||||
* Make the listener block the event loop from finishing.
|
||||
*
|
||||
* Note: the listener blocks the event loop from finishing by default.
|
||||
* This method is only meaningful after `.unref()` is called.
|
||||
*/
|
||||
ref(): void;
|
||||
/** **UNSTABLE**: new API, yet to be vetted.
|
||||
*
|
||||
* Make the listener not block the event loop from finishing.
|
||||
*/
|
||||
unref(): void;
|
||||
}
|
||||
|
||||
/** **UNSTABLE**: New API should be tested first.
|
||||
*
|
||||
* Acquire an advisory file-system lock for the provided file. `exclusive`
|
||||
|
|
|
@ -7,6 +7,7 @@ import {
|
|||
assertThrows,
|
||||
deferred,
|
||||
delay,
|
||||
execCode,
|
||||
} from "./test_util.ts";
|
||||
|
||||
let isCI: boolean;
|
||||
|
@ -807,3 +808,79 @@ Deno.test(
|
|||
assertEquals(res, "hello world!");
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true, run: true } },
|
||||
async function netListenUnref() {
|
||||
const [statusCode, _output] = await execCode(`
|
||||
async function main() {
|
||||
const listener = Deno.listen({ port: 3500 });
|
||||
listener.unref();
|
||||
await listener.accept(); // This doesn't block the program from exiting
|
||||
}
|
||||
main();
|
||||
`);
|
||||
assertEquals(statusCode, 0);
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true, run: true } },
|
||||
async function netListenUnref() {
|
||||
const [statusCode, _output] = await execCode(`
|
||||
async function main() {
|
||||
const listener = Deno.listen({ port: 3500 });
|
||||
await listener.accept();
|
||||
listener.unref();
|
||||
await listener.accept(); // The program exits here
|
||||
throw new Error(); // The program doesn't reach here
|
||||
}
|
||||
main();
|
||||
const conn = await Deno.connect({ port: 3500 });
|
||||
conn.close();
|
||||
`);
|
||||
assertEquals(statusCode, 0);
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true, run: true, net: true } },
|
||||
async function netListenUnrefAndRef() {
|
||||
const p = execCode(`
|
||||
async function main() {
|
||||
const listener = Deno.listen({ port: 3500 });
|
||||
listener.unref();
|
||||
listener.ref(); // This restores 'ref' state of listener
|
||||
await listener.accept();
|
||||
console.log("accepted")
|
||||
}
|
||||
main();
|
||||
`);
|
||||
// TODO(kt3k): This is racy. Find a correct way to
|
||||
// wait for the server to be ready
|
||||
setTimeout(async () => {
|
||||
const conn = await Deno.connect({ port: 3500 });
|
||||
conn.close();
|
||||
}, 200);
|
||||
const [statusCode, output] = await p;
|
||||
assertEquals(statusCode, 0);
|
||||
assertEquals(output.trim(), "accepted");
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { net: true } },
|
||||
async function netListenUnrefConcurrentAccept() {
|
||||
const timer = setTimeout(() => {}, 1000);
|
||||
const listener = Deno.listen({ port: 3500 });
|
||||
listener.accept().catch(() => {});
|
||||
listener.unref();
|
||||
// Unref'd listener still causes Busy error
|
||||
// on concurrent accept calls.
|
||||
await assertRejects(async () => {
|
||||
await listener.accept(); // The program exits here
|
||||
}, Deno.errors.Busy);
|
||||
listener.close();
|
||||
clearTimeout(timer);
|
||||
},
|
||||
);
|
||||
|
|
|
@ -30,7 +30,7 @@ export function pathToAbsoluteFileUrl(path: string): URL {
|
|||
|
||||
const decoder = new TextDecoder();
|
||||
|
||||
export async function execCode(code: string) {
|
||||
export async function execCode(code: string): Promise<[number, string]> {
|
||||
const p = Deno.run({
|
||||
cmd: [
|
||||
Deno.execPath(),
|
||||
|
|
|
@ -6,14 +6,18 @@
|
|||
const { BadResourcePrototype, InterruptedPrototype } = core;
|
||||
const { ReadableStream, WritableStream } = window.__bootstrap.streams;
|
||||
const {
|
||||
Error,
|
||||
ObjectPrototypeIsPrototypeOf,
|
||||
PromiseResolve,
|
||||
Symbol,
|
||||
SymbolAsyncIterator,
|
||||
Error,
|
||||
Uint8Array,
|
||||
SymbolFor,
|
||||
TypedArrayPrototypeSubarray,
|
||||
Uint8Array,
|
||||
} = window.__bootstrap.primordials;
|
||||
|
||||
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
|
||||
|
||||
async function read(
|
||||
rid,
|
||||
buffer,
|
||||
|
@ -191,9 +195,16 @@
|
|||
|
||||
class UnixConn extends Conn {}
|
||||
|
||||
// Use symbols for method names to hide these in stable API.
|
||||
// TODO(kt3k): Remove these symbols when ref/unref become stable.
|
||||
const listenerRef = Symbol("listenerRef");
|
||||
const listenerUnref = Symbol("listenerUnref");
|
||||
|
||||
class Listener {
|
||||
#rid = 0;
|
||||
#addr = null;
|
||||
#unref = false;
|
||||
#promiseId = null;
|
||||
|
||||
constructor(rid, addr) {
|
||||
this.#rid = rid;
|
||||
|
@ -208,15 +219,21 @@
|
|||
return this.#addr;
|
||||
}
|
||||
|
||||
async accept() {
|
||||
const res = await opAccept(this.rid, this.addr.transport);
|
||||
if (this.addr.transport == "tcp") {
|
||||
return new TcpConn(res.rid, res.remoteAddr, res.localAddr);
|
||||
} else if (this.addr.transport == "unix") {
|
||||
return new UnixConn(res.rid, res.remoteAddr, res.localAddr);
|
||||
} else {
|
||||
throw new Error("unreachable");
|
||||
accept() {
|
||||
const promise = opAccept(this.rid, this.addr.transport);
|
||||
this.#promiseId = promise[promiseIdSymbol];
|
||||
if (this.#unref) {
|
||||
this.#unrefOpAccept();
|
||||
}
|
||||
return promise.then((res) => {
|
||||
if (this.addr.transport == "tcp") {
|
||||
return new TcpConn(res.rid, res.remoteAddr, res.localAddr);
|
||||
} else if (this.addr.transport == "unix") {
|
||||
return new UnixConn(res.rid, res.remoteAddr, res.localAddr);
|
||||
} else {
|
||||
throw new Error("unreachable");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async next() {
|
||||
|
@ -247,6 +264,27 @@
|
|||
[SymbolAsyncIterator]() {
|
||||
return this;
|
||||
}
|
||||
|
||||
[listenerRef]() {
|
||||
this.#unref = false;
|
||||
this.#refOpAccept();
|
||||
}
|
||||
|
||||
[listenerUnref]() {
|
||||
this.#unref = true;
|
||||
this.#unrefOpAccept();
|
||||
}
|
||||
|
||||
#refOpAccept() {
|
||||
if (typeof this.#promiseId === "number") {
|
||||
core.refOp(this.#promiseId);
|
||||
}
|
||||
}
|
||||
#unrefOpAccept() {
|
||||
if (typeof this.#promiseId === "number") {
|
||||
core.unrefOp(this.#promiseId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class Datagram {
|
||||
|
@ -304,14 +342,14 @@
|
|||
}
|
||||
}
|
||||
|
||||
function listen({ hostname, ...options }) {
|
||||
function listen({ hostname, ...options }, constructor = Listener) {
|
||||
const res = opListen({
|
||||
transport: "tcp",
|
||||
hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname,
|
||||
...options,
|
||||
});
|
||||
|
||||
return new Listener(res.rid, res.localAddr);
|
||||
return new constructor(res.rid, res.localAddr);
|
||||
}
|
||||
|
||||
async function connect(options) {
|
||||
|
@ -335,6 +373,8 @@
|
|||
UnixConn,
|
||||
opConnect,
|
||||
listen,
|
||||
listenerRef,
|
||||
listenerUnref,
|
||||
opListen,
|
||||
Listener,
|
||||
shutdown,
|
||||
|
|
|
@ -7,9 +7,9 @@
|
|||
function listen(options) {
|
||||
if (options.transport === "unix") {
|
||||
const res = net.opListen(options);
|
||||
return new net.Listener(res.rid, res.localAddr);
|
||||
return new Listener(res.rid, res.localAddr);
|
||||
} else {
|
||||
return net.listen(options);
|
||||
return net.listen(options, Listener);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -41,9 +41,20 @@
|
|||
}
|
||||
}
|
||||
|
||||
class Listener extends net.Listener {
|
||||
ref() {
|
||||
this[net.listenerRef]();
|
||||
}
|
||||
|
||||
unref() {
|
||||
this[net.listenerUnref]();
|
||||
}
|
||||
}
|
||||
|
||||
window.__bootstrap.netUnstable = {
|
||||
connect,
|
||||
listenDatagram,
|
||||
listen,
|
||||
Listener,
|
||||
};
|
||||
})(this);
|
||||
|
|
|
@ -132,6 +132,7 @@
|
|||
listen: __bootstrap.netUnstable.listen,
|
||||
connect: __bootstrap.netUnstable.connect,
|
||||
listenDatagram: __bootstrap.netUnstable.listenDatagram,
|
||||
Listener: __bootstrap.netUnstable.Listener,
|
||||
umask: __bootstrap.fs.umask,
|
||||
futime: __bootstrap.fs.futime,
|
||||
futimeSync: __bootstrap.fs.futimeSync,
|
||||
|
|
Loading…
Reference in a new issue