From 9c4431758dbf16f21f1f107123349cf99cb0c88a Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Thu, 4 Jan 2024 08:51:39 +0530 Subject: [PATCH] fix(ext/node): UdpSocket ref and unref (#21777) --- cli/tests/integration/node_unit_tests.rs | 1 + cli/tests/unit_node/dgram_test.ts | 59 +++++++++++++++++++ ext/net/01_net.js | 22 ++++++- .../polyfills/internal_binding/udp_wrap.ts | 11 +++- 4 files changed, 89 insertions(+), 4 deletions(-) create mode 100644 cli/tests/unit_node/dgram_test.ts diff --git a/cli/tests/integration/node_unit_tests.rs b/cli/tests/integration/node_unit_tests.rs index 5e53a63c59..1508ad9ac7 100644 --- a/cli/tests/integration/node_unit_tests.rs +++ b/cli/tests/integration/node_unit_tests.rs @@ -58,6 +58,7 @@ util::unit_test_factory!( crypto_key_test = crypto / crypto_key_test, crypto_sign_test = crypto / crypto_sign_test, events_test, + dgram_test, fs_test, http_test, http2_test, diff --git a/cli/tests/unit_node/dgram_test.ts b/cli/tests/unit_node/dgram_test.ts new file mode 100644 index 0000000000..d532241f51 --- /dev/null +++ b/cli/tests/unit_node/dgram_test.ts @@ -0,0 +1,59 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +import { assertEquals } from "../../../test_util/std/assert/mod.ts"; +import { execCode } from "../unit/test_util.ts"; +import { createSocket } from "node:dgram"; + +const listenPort = 4503; +const listenPort2 = 4504; + +Deno.test("[node/dgram] udp ref and unref", { + permissions: { read: true, run: true, net: true }, +}, async () => { + const { promise, resolve } = Promise.withResolvers(); + + const udpSocket = createSocket("udp4"); + udpSocket.bind(listenPort); + + udpSocket.unref(); + udpSocket.ref(); + + let data; + udpSocket.on("message", (buffer, _rinfo) => { + data = Uint8Array.from(buffer); + udpSocket.close(); + }); + udpSocket.on("close", () => { + resolve(); + }); + + const conn = await Deno.listenDatagram({ + port: listenPort2, + transport: "udp", + }); + await conn.send(new Uint8Array([0, 1, 2, 3]), { + transport: "udp", + port: listenPort, + hostname: "127.0.0.1", + }); + + await promise; + conn.close(); + assertEquals(data, new Uint8Array([0, 1, 2, 3])); +}); + +Deno.test("[node/dgram] udp unref", { + permissions: { read: true, run: true, net: true }, +}, async () => { + const [statusCode, _output] = await execCode(` + import { createSocket } from "node:dgram"; + const udpSocket = createSocket('udp4'); + udpSocket.bind(${listenPort2}); + // This should let the program to exit without waiting for the + // udp socket to close. + udpSocket.unref(); + udpSocket.on('message', (buffer, rinfo) => { + }); + `); + assertEquals(statusCode, 0); +}); diff --git a/ext/net/01_net.js b/ext/net/01_net.js index c4a9215362..699423b228 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -295,6 +295,8 @@ class Listener { class Datagram { #rid = 0; #addr = null; + #unref = false; + #promise = null; constructor(rid, addr, bufSize = 1024) { this.#rid = rid; @@ -367,10 +369,12 @@ class Datagram { let remoteAddr; switch (this.addr.transport) { case "udp": { - ({ 0: nread, 1: remoteAddr } = await op_net_recv_udp( + this.#promise = op_net_recv_udp( this.rid, buf, - )); + ); + if (this.#unref) core.unrefOpPromise(this.#promise); + ({ 0: nread, 1: remoteAddr } = await this.#promise); remoteAddr.transport = "udp"; break; } @@ -413,6 +417,20 @@ class Datagram { core.close(this.rid); } + ref() { + this.#unref = false; + if (this.#promise !== null) { + core.refOpPromise(this.#promise); + } + } + + unref() { + this.#unref = true; + if (this.#promise !== null) { + core.unrefOpPromise(this.#promise); + } + } + async *[SymbolAsyncIterator]() { while (true) { try { diff --git a/ext/node/polyfills/internal_binding/udp_wrap.ts b/ext/node/polyfills/internal_binding/udp_wrap.ts index 6199509979..209c84a234 100644 --- a/ext/node/polyfills/internal_binding/udp_wrap.ts +++ b/ext/node/polyfills/internal_binding/udp_wrap.ts @@ -78,6 +78,7 @@ export class UDP extends HandleWrap { #listener?: Deno.DatagramConn; #receiving = false; + #unrefed = false; #recvBufferSize = UDP_DGRAM_MAXSIZE; #sendBufferSize = UDP_DGRAM_MAXSIZE; @@ -273,7 +274,8 @@ export class UDP extends HandleWrap { } override ref() { - notImplemented("udp.UDP.prototype.ref"); + this.#listener?.ref(); + this.#unrefed = false; } send( @@ -315,7 +317,8 @@ export class UDP extends HandleWrap { } override unref() { - notImplemented("udp.UDP.prototype.unref"); + this.#listener?.unref(); + this.#unrefed = true; } #doBind(ip: string, port: number, _flags: number, family: number): number { @@ -443,6 +446,10 @@ export class UDP extends HandleWrap { let remoteAddr: Deno.NetAddr | null; let nread: number | null; + if (this.#unrefed) { + this.#listener!.unref(); + } + try { [buf, remoteAddr] = (await this.#listener!.receive(p)) as [ Uint8Array,