mirror of
https://github.com/denoland/deno.git
synced 2025-01-07 06:46:59 -05:00
feat(unstable): add ref/unref to Listener (#13961)
Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
parent
4b853611ba
commit
9df06c346f
6 changed files with 160 additions and 15 deletions
16
cli/dts/lib.deno.unstable.d.ts
vendored
16
cli/dts/lib.deno.unstable.d.ts
vendored
|
@ -1296,6 +1296,22 @@ declare namespace Deno {
|
||||||
alpnProtocols?: string[];
|
alpnProtocols?: string[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface Listener extends AsyncIterable<Conn> {
|
||||||
|
/** **UNSTABLE**: new API, yet to be vetted.
|
||||||
|
*
|
||||||
|
* Make the listener block the event loop from finishing.
|
||||||
|
*
|
||||||
|
* Note: the listener blocks the event loop from finishing by default.
|
||||||
|
* This method is only meaningful after `.unref()` is called.
|
||||||
|
*/
|
||||||
|
ref(): void;
|
||||||
|
/** **UNSTABLE**: new API, yet to be vetted.
|
||||||
|
*
|
||||||
|
* Make the listener not block the event loop from finishing.
|
||||||
|
*/
|
||||||
|
unref(): void;
|
||||||
|
}
|
||||||
|
|
||||||
/** **UNSTABLE**: New API should be tested first.
|
/** **UNSTABLE**: New API should be tested first.
|
||||||
*
|
*
|
||||||
* Acquire an advisory file-system lock for the provided file. `exclusive`
|
* Acquire an advisory file-system lock for the provided file. `exclusive`
|
||||||
|
|
|
@ -7,6 +7,7 @@ import {
|
||||||
assertThrows,
|
assertThrows,
|
||||||
deferred,
|
deferred,
|
||||||
delay,
|
delay,
|
||||||
|
execCode,
|
||||||
} from "./test_util.ts";
|
} from "./test_util.ts";
|
||||||
|
|
||||||
let isCI: boolean;
|
let isCI: boolean;
|
||||||
|
@ -807,3 +808,79 @@ Deno.test(
|
||||||
assertEquals(res, "hello world!");
|
assertEquals(res, "hello world!");
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
Deno.test(
|
||||||
|
{ permissions: { read: true, run: true } },
|
||||||
|
async function netListenUnref() {
|
||||||
|
const [statusCode, _output] = await execCode(`
|
||||||
|
async function main() {
|
||||||
|
const listener = Deno.listen({ port: 3500 });
|
||||||
|
listener.unref();
|
||||||
|
await listener.accept(); // This doesn't block the program from exiting
|
||||||
|
}
|
||||||
|
main();
|
||||||
|
`);
|
||||||
|
assertEquals(statusCode, 0);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
Deno.test(
|
||||||
|
{ permissions: { read: true, run: true } },
|
||||||
|
async function netListenUnref() {
|
||||||
|
const [statusCode, _output] = await execCode(`
|
||||||
|
async function main() {
|
||||||
|
const listener = Deno.listen({ port: 3500 });
|
||||||
|
await listener.accept();
|
||||||
|
listener.unref();
|
||||||
|
await listener.accept(); // The program exits here
|
||||||
|
throw new Error(); // The program doesn't reach here
|
||||||
|
}
|
||||||
|
main();
|
||||||
|
const conn = await Deno.connect({ port: 3500 });
|
||||||
|
conn.close();
|
||||||
|
`);
|
||||||
|
assertEquals(statusCode, 0);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
Deno.test(
|
||||||
|
{ permissions: { read: true, run: true, net: true } },
|
||||||
|
async function netListenUnrefAndRef() {
|
||||||
|
const p = execCode(`
|
||||||
|
async function main() {
|
||||||
|
const listener = Deno.listen({ port: 3500 });
|
||||||
|
listener.unref();
|
||||||
|
listener.ref(); // This restores 'ref' state of listener
|
||||||
|
await listener.accept();
|
||||||
|
console.log("accepted")
|
||||||
|
}
|
||||||
|
main();
|
||||||
|
`);
|
||||||
|
// TODO(kt3k): This is racy. Find a correct way to
|
||||||
|
// wait for the server to be ready
|
||||||
|
setTimeout(async () => {
|
||||||
|
const conn = await Deno.connect({ port: 3500 });
|
||||||
|
conn.close();
|
||||||
|
}, 200);
|
||||||
|
const [statusCode, output] = await p;
|
||||||
|
assertEquals(statusCode, 0);
|
||||||
|
assertEquals(output.trim(), "accepted");
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
Deno.test(
|
||||||
|
{ permissions: { net: true } },
|
||||||
|
async function netListenUnrefConcurrentAccept() {
|
||||||
|
const timer = setTimeout(() => {}, 1000);
|
||||||
|
const listener = Deno.listen({ port: 3500 });
|
||||||
|
listener.accept().catch(() => {});
|
||||||
|
listener.unref();
|
||||||
|
// Unref'd listener still causes Busy error
|
||||||
|
// on concurrent accept calls.
|
||||||
|
await assertRejects(async () => {
|
||||||
|
await listener.accept(); // The program exits here
|
||||||
|
}, Deno.errors.Busy);
|
||||||
|
listener.close();
|
||||||
|
clearTimeout(timer);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
|
@ -30,7 +30,7 @@ export function pathToAbsoluteFileUrl(path: string): URL {
|
||||||
|
|
||||||
const decoder = new TextDecoder();
|
const decoder = new TextDecoder();
|
||||||
|
|
||||||
export async function execCode(code: string) {
|
export async function execCode(code: string): Promise<[number, string]> {
|
||||||
const p = Deno.run({
|
const p = Deno.run({
|
||||||
cmd: [
|
cmd: [
|
||||||
Deno.execPath(),
|
Deno.execPath(),
|
||||||
|
|
|
@ -6,14 +6,18 @@
|
||||||
const { BadResourcePrototype, InterruptedPrototype } = core;
|
const { BadResourcePrototype, InterruptedPrototype } = core;
|
||||||
const { ReadableStream, WritableStream } = window.__bootstrap.streams;
|
const { ReadableStream, WritableStream } = window.__bootstrap.streams;
|
||||||
const {
|
const {
|
||||||
|
Error,
|
||||||
ObjectPrototypeIsPrototypeOf,
|
ObjectPrototypeIsPrototypeOf,
|
||||||
PromiseResolve,
|
PromiseResolve,
|
||||||
|
Symbol,
|
||||||
SymbolAsyncIterator,
|
SymbolAsyncIterator,
|
||||||
Error,
|
SymbolFor,
|
||||||
Uint8Array,
|
|
||||||
TypedArrayPrototypeSubarray,
|
TypedArrayPrototypeSubarray,
|
||||||
|
Uint8Array,
|
||||||
} = window.__bootstrap.primordials;
|
} = window.__bootstrap.primordials;
|
||||||
|
|
||||||
|
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
|
||||||
|
|
||||||
async function read(
|
async function read(
|
||||||
rid,
|
rid,
|
||||||
buffer,
|
buffer,
|
||||||
|
@ -191,9 +195,16 @@
|
||||||
|
|
||||||
class UnixConn extends Conn {}
|
class UnixConn extends Conn {}
|
||||||
|
|
||||||
|
// Use symbols for method names to hide these in stable API.
|
||||||
|
// TODO(kt3k): Remove these symbols when ref/unref become stable.
|
||||||
|
const listenerRef = Symbol("listenerRef");
|
||||||
|
const listenerUnref = Symbol("listenerUnref");
|
||||||
|
|
||||||
class Listener {
|
class Listener {
|
||||||
#rid = 0;
|
#rid = 0;
|
||||||
#addr = null;
|
#addr = null;
|
||||||
|
#unref = false;
|
||||||
|
#promiseId = null;
|
||||||
|
|
||||||
constructor(rid, addr) {
|
constructor(rid, addr) {
|
||||||
this.#rid = rid;
|
this.#rid = rid;
|
||||||
|
@ -208,15 +219,21 @@
|
||||||
return this.#addr;
|
return this.#addr;
|
||||||
}
|
}
|
||||||
|
|
||||||
async accept() {
|
accept() {
|
||||||
const res = await opAccept(this.rid, this.addr.transport);
|
const promise = opAccept(this.rid, this.addr.transport);
|
||||||
if (this.addr.transport == "tcp") {
|
this.#promiseId = promise[promiseIdSymbol];
|
||||||
return new TcpConn(res.rid, res.remoteAddr, res.localAddr);
|
if (this.#unref) {
|
||||||
} else if (this.addr.transport == "unix") {
|
this.#unrefOpAccept();
|
||||||
return new UnixConn(res.rid, res.remoteAddr, res.localAddr);
|
|
||||||
} else {
|
|
||||||
throw new Error("unreachable");
|
|
||||||
}
|
}
|
||||||
|
return promise.then((res) => {
|
||||||
|
if (this.addr.transport == "tcp") {
|
||||||
|
return new TcpConn(res.rid, res.remoteAddr, res.localAddr);
|
||||||
|
} else if (this.addr.transport == "unix") {
|
||||||
|
return new UnixConn(res.rid, res.remoteAddr, res.localAddr);
|
||||||
|
} else {
|
||||||
|
throw new Error("unreachable");
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async next() {
|
async next() {
|
||||||
|
@ -247,6 +264,27 @@
|
||||||
[SymbolAsyncIterator]() {
|
[SymbolAsyncIterator]() {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[listenerRef]() {
|
||||||
|
this.#unref = false;
|
||||||
|
this.#refOpAccept();
|
||||||
|
}
|
||||||
|
|
||||||
|
[listenerUnref]() {
|
||||||
|
this.#unref = true;
|
||||||
|
this.#unrefOpAccept();
|
||||||
|
}
|
||||||
|
|
||||||
|
#refOpAccept() {
|
||||||
|
if (typeof this.#promiseId === "number") {
|
||||||
|
core.refOp(this.#promiseId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#unrefOpAccept() {
|
||||||
|
if (typeof this.#promiseId === "number") {
|
||||||
|
core.unrefOp(this.#promiseId);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class Datagram {
|
class Datagram {
|
||||||
|
@ -304,14 +342,14 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function listen({ hostname, ...options }) {
|
function listen({ hostname, ...options }, constructor = Listener) {
|
||||||
const res = opListen({
|
const res = opListen({
|
||||||
transport: "tcp",
|
transport: "tcp",
|
||||||
hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname,
|
hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname,
|
||||||
...options,
|
...options,
|
||||||
});
|
});
|
||||||
|
|
||||||
return new Listener(res.rid, res.localAddr);
|
return new constructor(res.rid, res.localAddr);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function connect(options) {
|
async function connect(options) {
|
||||||
|
@ -335,6 +373,8 @@
|
||||||
UnixConn,
|
UnixConn,
|
||||||
opConnect,
|
opConnect,
|
||||||
listen,
|
listen,
|
||||||
|
listenerRef,
|
||||||
|
listenerUnref,
|
||||||
opListen,
|
opListen,
|
||||||
Listener,
|
Listener,
|
||||||
shutdown,
|
shutdown,
|
||||||
|
|
|
@ -7,9 +7,9 @@
|
||||||
function listen(options) {
|
function listen(options) {
|
||||||
if (options.transport === "unix") {
|
if (options.transport === "unix") {
|
||||||
const res = net.opListen(options);
|
const res = net.opListen(options);
|
||||||
return new net.Listener(res.rid, res.localAddr);
|
return new Listener(res.rid, res.localAddr);
|
||||||
} else {
|
} else {
|
||||||
return net.listen(options);
|
return net.listen(options, Listener);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,9 +41,20 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class Listener extends net.Listener {
|
||||||
|
ref() {
|
||||||
|
this[net.listenerRef]();
|
||||||
|
}
|
||||||
|
|
||||||
|
unref() {
|
||||||
|
this[net.listenerUnref]();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
window.__bootstrap.netUnstable = {
|
window.__bootstrap.netUnstable = {
|
||||||
connect,
|
connect,
|
||||||
listenDatagram,
|
listenDatagram,
|
||||||
listen,
|
listen,
|
||||||
|
Listener,
|
||||||
};
|
};
|
||||||
})(this);
|
})(this);
|
||||||
|
|
|
@ -132,6 +132,7 @@
|
||||||
listen: __bootstrap.netUnstable.listen,
|
listen: __bootstrap.netUnstable.listen,
|
||||||
connect: __bootstrap.netUnstable.connect,
|
connect: __bootstrap.netUnstable.connect,
|
||||||
listenDatagram: __bootstrap.netUnstable.listenDatagram,
|
listenDatagram: __bootstrap.netUnstable.listenDatagram,
|
||||||
|
Listener: __bootstrap.netUnstable.Listener,
|
||||||
umask: __bootstrap.fs.umask,
|
umask: __bootstrap.fs.umask,
|
||||||
futime: __bootstrap.fs.futime,
|
futime: __bootstrap.fs.futime,
|
||||||
futimeSync: __bootstrap.fs.futimeSync,
|
futimeSync: __bootstrap.fs.futimeSync,
|
||||||
|
|
Loading…
Reference in a new issue