// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. // Copyright Joyent, Inc. and other Node contributors. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the // "Software"), to deal in the Software without restriction, including // without limitation the rights to use, copy, modify, merge, publish, // distribute, sublicense, and/or sell copies of the Software, and to permit // persons to whom the Software is furnished to do so, subject to the // following conditions: // // The above copyright notice and this permission notice shall be included // in all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. import { Buffer } from "internal:deno_node/polyfills/buffer.ts"; import { EventEmitter } from "internal:deno_node/polyfills/events.ts"; import { lookup as defaultLookup } from "internal:deno_node/polyfills/dns.ts"; import type { ErrnoException, NodeSystemErrorCtx, } from "internal:deno_node/polyfills/internal/errors.ts"; import { ERR_BUFFER_OUT_OF_BOUNDS, ERR_INVALID_ARG_TYPE, ERR_INVALID_FD_TYPE, ERR_MISSING_ARGS, ERR_SOCKET_ALREADY_BOUND, ERR_SOCKET_BAD_BUFFER_SIZE, ERR_SOCKET_BUFFER_SIZE, ERR_SOCKET_DGRAM_IS_CONNECTED, ERR_SOCKET_DGRAM_NOT_CONNECTED, ERR_SOCKET_DGRAM_NOT_RUNNING, errnoException, exceptionWithHostPort, } from "internal:deno_node/polyfills/internal/errors.ts"; import type { Abortable } from "internal:deno_node/polyfills/_events.d.ts"; import { kStateSymbol, newHandle, } from "internal:deno_node/polyfills/internal/dgram.ts"; import type { SocketType } from "internal:deno_node/polyfills/internal/dgram.ts"; import { asyncIdSymbol, defaultTriggerAsyncIdScope, ownerSymbol, } from "internal:deno_node/polyfills/internal/async_hooks.ts"; import { SendWrap, UDP, } from "internal:deno_node/polyfills/internal_binding/udp_wrap.ts"; import { isInt32, validateAbortSignal, validateNumber, validatePort, validateString, } from "internal:deno_node/polyfills/internal/validators.mjs"; import { guessHandleType } from "internal:deno_node/polyfills/internal_binding/util.ts"; import { os } from "internal:deno_node/polyfills/internal_binding/constants.ts"; import { nextTick } from "internal:deno_node/polyfills/process.ts"; import { channel } from "internal:deno_node/polyfills/diagnostics_channel.ts"; import { isArrayBufferView } from "internal:deno_node/polyfills/internal/util/types.ts"; const { UV_UDP_REUSEADDR, UV_UDP_IPV6ONLY } = os; const udpSocketChannel = channel("udp.socket"); const BIND_STATE_UNBOUND = 0; const BIND_STATE_BINDING = 1; const BIND_STATE_BOUND = 2; const CONNECT_STATE_DISCONNECTED = 0; const CONNECT_STATE_CONNECTING = 1; const CONNECT_STATE_CONNECTED = 2; const RECV_BUFFER = true; const SEND_BUFFER = false; export interface AddressInfo { address: string; family: number; port: number; } export type MessageType = string | Uint8Array | Buffer | DataView; export type RemoteInfo = { address: string; family: "IPv4" | "IPv6"; port: number; size?: number; }; export interface BindOptions { port?: number; address?: string; exclusive?: boolean; fd?: number; } export interface SocketOptions extends Abortable { type: SocketType; reuseAddr?: boolean; /** * @default false */ ipv6Only?: boolean; recvBufferSize?: number; sendBufferSize?: number; lookup?: typeof defaultLookup; } interface SocketInternalState { handle: UDP | null; receiving: boolean; bindState: | typeof BIND_STATE_UNBOUND | typeof BIND_STATE_BINDING | typeof BIND_STATE_BOUND; connectState: | typeof CONNECT_STATE_DISCONNECTED | typeof CONNECT_STATE_CONNECTING | typeof CONNECT_STATE_CONNECTED; queue?: Array<() => void>; reuseAddr?: boolean; ipv6Only?: boolean; recvBufferSize?: number; sendBufferSize?: number; } const isSocketOptions = ( socketOption: unknown, ): socketOption is SocketOptions => socketOption !== null && typeof socketOption === "object"; const isUdpHandle = (handle: unknown): handle is UDP => handle !== null && typeof handle === "object" && typeof (handle as UDP).recvStart === "function"; const isBindOptions = (options: unknown): options is BindOptions => options !== null && typeof options === "object"; /** * Encapsulates the datagram functionality. * * New instances of `dgram.Socket` are created using `createSocket`. * The `new` keyword is not to be used to create `dgram.Socket` instances. */ export class Socket extends EventEmitter { [asyncIdSymbol]!: number; [kStateSymbol]!: SocketInternalState; type!: SocketType; constructor( type: SocketType | SocketOptions, listener?: (msg: Buffer, rinfo: RemoteInfo) => void, ) { super(); let lookup; let recvBufferSize; let sendBufferSize; let options: SocketOptions | undefined; if (isSocketOptions(type)) { options = type; type = options.type; lookup = options.lookup; recvBufferSize = options.recvBufferSize; sendBufferSize = options.sendBufferSize; } const handle = newHandle(type, lookup); handle[ownerSymbol] = this; this[asyncIdSymbol] = handle.getAsyncId(); this.type = type; if (typeof listener === "function") { this.on("message", listener); } this[kStateSymbol] = { handle, receiving: false, bindState: BIND_STATE_UNBOUND, connectState: CONNECT_STATE_DISCONNECTED, queue: undefined, reuseAddr: options && options.reuseAddr, // Use UV_UDP_REUSEADDR if true. ipv6Only: options && options.ipv6Only, recvBufferSize, sendBufferSize, }; if (options?.signal !== undefined) { const { signal } = options; validateAbortSignal(signal, "options.signal"); const onAborted = () => { this.close(); }; if (signal.aborted) { onAborted(); } else { signal.addEventListener("abort", onAborted); this.once( "close", () => signal.removeEventListener("abort", onAborted), ); } } if (udpSocketChannel.hasSubscribers) { udpSocketChannel.publish({ socket: this, }); } } /** * Tells the kernel to join a multicast group at the given `multicastAddress` * and `multicastInterface` using the `IP_ADD_MEMBERSHIP` socket option. If * the`multicastInterface` argument is not specified, the operating system * will choose one interface and will add membership to it. To add membership * to every available interface, call `addMembership` multiple times, once * per interface. * * When called on an unbound socket, this method will implicitly bind to a * random port, listening on all interfaces. * * When sharing a UDP socket across multiple `cluster` workers, the * `socket.addMembership()` function must be called only once or an * `EADDRINUSE` error will occur: * * ```js * import cluster from "internal:deno_node/polyfills/cluster"; * import dgram from "internal:deno_node/polyfills/dgram"; * * if (cluster.isPrimary) { * cluster.fork(); // Works ok. * cluster.fork(); // Fails with EADDRINUSE. * } else { * const s = dgram.createSocket('udp4'); * s.bind(1234, () => { * s.addMembership('224.0.0.114'); * }); * } * ``` */ addMembership(multicastAddress: string, interfaceAddress?: string) { healthCheck(this); if (!multicastAddress) { throw new ERR_MISSING_ARGS("multicastAddress"); } const { handle } = this[kStateSymbol]; const err = handle!.addMembership(multicastAddress, interfaceAddress); if (err) { throw errnoException(err, "addMembership"); } } /** * Tells the kernel to join a source-specific multicast channel at the given * `sourceAddress` and `groupAddress`, using the `multicastInterface` with * the `IP_ADD_SOURCE_MEMBERSHIP` socket option. If the `multicastInterface` * argument is not specified, the operating system will choose one interface * and will add membership to it. To add membership to every available * interface, call `socket.addSourceSpecificMembership()` multiple times, * once per interface. * * When called on an unbound socket, this method will implicitly bind to a * random port, listening on all interfaces. */ addSourceSpecificMembership( sourceAddress: string, groupAddress: string, interfaceAddress?: string, ) { healthCheck(this); validateString(sourceAddress, "sourceAddress"); validateString(groupAddress, "groupAddress"); const err = this[kStateSymbol].handle!.addSourceSpecificMembership( sourceAddress, groupAddress, interfaceAddress, ); if (err) { throw errnoException(err, "addSourceSpecificMembership"); } } /** * Returns an object containing the address information for a socket. * For UDP sockets, this object will contain `address`, `family` and `port`properties. * * This method throws `EBADF` if called on an unbound socket. */ address(): AddressInfo { healthCheck(this); const out = {}; const err = this[kStateSymbol].handle!.getsockname(out); if (err) { throw errnoException(err, "getsockname"); } return out as AddressInfo; } /** * For UDP sockets, causes the `dgram.Socket` to listen for datagram * messages on a named `port` and optional `address`. If `port` is not * specified or is `0`, the operating system will attempt to bind to a * random port. If `address` is not specified, the operating system will * attempt to listen on all addresses. Once binding is complete, a * `'listening'` event is emitted and the optional `callback` function is * called. * * Specifying both a `'listening'` event listener and passing a `callback` to * the `socket.bind()` method is not harmful but not very useful. * * A bound datagram socket keeps the Node.js process running to receive * datagram messages. * * If binding fails, an `'error'` event is generated. In rare case (e.g. * attempting to bind with a closed socket), an `Error` may be thrown. * * Example of a UDP server listening on port 41234: * * ```js * import dgram from "internal:deno_node/polyfills/dgram"; * * const server = dgram.createSocket('udp4'); * * server.on('error', (err) => { * console.log(`server error:\n${err.stack}`); * server.close(); * }); * * server.on('message', (msg, rinfo) => { * console.log(`server got: ${msg} from ${rinfo.address}:${rinfo.port}`); * }); * * server.on('listening', () => { * const address = server.address(); * console.log(`server listening ${address.address}:${address.port}`); * }); * * server.bind(41234); * // Prints: server listening 0.0.0.0:41234 * ``` * * @param callback with no parameters. Called when binding is complete. */ bind(port?: number, address?: string, callback?: () => void): this; bind(port: number, callback?: () => void): this; bind(callback: () => void): this; bind(options: BindOptions, callback?: () => void): this; bind(port_?: unknown, address_?: unknown /* callback */): this { let port = typeof port_ === "function" ? null : port_; healthCheck(this); const state = this[kStateSymbol]; if (state.bindState !== BIND_STATE_UNBOUND) { throw new ERR_SOCKET_ALREADY_BOUND(); } state.bindState = BIND_STATE_BINDING; const cb = arguments.length && arguments[arguments.length - 1]; if (typeof cb === "function") { // deno-lint-ignore no-inner-declarations function removeListeners(this: Socket) { this.removeListener("error", removeListeners); this.removeListener("listening", onListening); } // deno-lint-ignore no-inner-declarations function onListening(this: Socket) { removeListeners.call(this); cb.call(this); } this.on("error", removeListeners); this.on("listening", onListening); } if (isUdpHandle(port)) { replaceHandle(this, port); startListening(this); return this; } // Open an existing fd instead of creating a new one. if (isBindOptions(port) && isInt32(port.fd!) && port.fd! > 0) { const fd = port.fd!; const state = this[kStateSymbol]; // TODO(cmorten): here we deviate somewhat from the Node implementation which // makes use of the https://nodejs.org/api/cluster.html module to run servers // across a "cluster" of Node processes to take advantage of multi-core // systems. // // Though Deno has has a Worker capability from which we could simulate this, // for now we assert that we are _always_ on the primary process. const type = guessHandleType(fd); if (type !== "UDP") { throw new ERR_INVALID_FD_TYPE(type); } const err = state.handle!.open(fd); if (err) { throw errnoException(err, "open"); } startListening(this); return this; } let address: string; if (isBindOptions(port)) { address = port.address || ""; port = port.port; } else { address = typeof address_ === "function" ? "" : (address_ as string); } // Defaulting address for bind to all interfaces if (!address) { if (this.type === "udp4") { address = "0.0.0.0"; } else { address = "::"; } } // Resolve address first state.handle!.lookup(address, (lookupError, ip) => { if (lookupError) { state.bindState = BIND_STATE_UNBOUND; this.emit("error", lookupError); return; } let flags: number | undefined = 0; if (state.reuseAddr) { flags |= UV_UDP_REUSEADDR; } if (state.ipv6Only) { flags |= UV_UDP_IPV6ONLY!; } // TODO(cmorten): here we deviate somewhat from the Node implementation which // makes use of the https://nodejs.org/api/cluster.html module to run servers // across a "cluster" of Node processes to take advantage of multi-core // systems. // // Though Deno has has a Worker capability from which we could simulate this, // for now we assert that we are _always_ on the primary process. if (!state.handle) { return; // Handle has been closed in the mean time } const err = state.handle.bind(ip, port as number || 0, flags); if (err) { const ex = exceptionWithHostPort(err, "bind", ip, port as number); state.bindState = BIND_STATE_UNBOUND; this.emit("error", ex); // Todo(@bartlomieju): close? return; } startListening(this); }); return this; } /** * Close the underlying socket and stop listening for data on it. If a * callback is provided, it is added as a listener for the `'close'` event. * * @param callback Called when the socket has been closed. */ close(callback?: () => void): this { const state = this[kStateSymbol]; const queue = state.queue; if (typeof callback === "function") { this.on("close", callback); } if (queue !== undefined) { queue.push(this.close.bind(this)); return this; } healthCheck(this); stopReceiving(this); state.handle!.close(); state.handle = null; defaultTriggerAsyncIdScope( this[asyncIdSymbol], nextTick, socketCloseNT, this, ); return this; } /** * Associates the `dgram.Socket` to a remote address and port. Every * message sent by this handle is automatically sent to that destination. * Also, the socket will only receive messages from that remote peer. * Trying to call `connect()` on an already connected socket will result * in an `ERR_SOCKET_DGRAM_IS_CONNECTED` exception. If `address` is not * provided, `'127.0.0.1'` (for `udp4` sockets) or `'::1'` (for `udp6` sockets) * will be used by default. Once the connection is complete, a `'connect'` event * is emitted and the optional `callback` function is called. In case of failure, * the `callback` is called or, failing this, an `'error'` event is emitted. * * @param callback Called when the connection is completed or on error. */ connect( port: number, address?: string, callback?: (err?: ErrnoException) => void, ): void; connect(port: number, callback: (err?: ErrnoException) => void): void; connect(port: number, address?: unknown, callback?: unknown) { port = validatePort(port, "Port", false); if (typeof address === "function") { callback = address; address = ""; } else if (address === undefined) { address = ""; } validateString(address, "address"); const state = this[kStateSymbol]; if (state.connectState !== CONNECT_STATE_DISCONNECTED) { throw new ERR_SOCKET_DGRAM_IS_CONNECTED(); } state.connectState = CONNECT_STATE_CONNECTING; if (state.bindState === BIND_STATE_UNBOUND) { this.bind({ port: 0, exclusive: true }); } if (state.bindState !== BIND_STATE_BOUND) { enqueue( this, _connect.bind( this, port, address as string, callback as (err?: ErrnoException) => void, ), ); return; } Reflect.apply(_connect, this, [port, address, callback]); } /** * A synchronous function that disassociates a connected `dgram.Socket` from * its remote address. Trying to call `disconnect()` on an unbound or already * disconnected socket will result in an `ERR_SOCKET_DGRAM_NOT_CONNECTED` * exception. */ disconnect() { const state = this[kStateSymbol]; if (state.connectState !== CONNECT_STATE_CONNECTED) { throw new ERR_SOCKET_DGRAM_NOT_CONNECTED(); } const err = state.handle!.disconnect(); if (err) { throw errnoException(err, "connect"); } else { state.connectState = CONNECT_STATE_DISCONNECTED; } } /** * Instructs the kernel to leave a multicast group at `multicastAddress` * using the `IP_DROP_MEMBERSHIP` socket option. This method is automatically * called by the kernel when the socket is closed or the process terminates, * so most apps will never have reason to call this. * * If `multicastInterface` is not specified, the operating system will * attempt to drop membership on all valid interfaces. */ dropMembership(multicastAddress: string, interfaceAddress?: string) { healthCheck(this); if (!multicastAddress) { throw new ERR_MISSING_ARGS("multicastAddress"); } const err = this[kStateSymbol].handle!.dropMembership( multicastAddress, interfaceAddress, ); if (err) { throw errnoException(err, "dropMembership"); } } /** * Instructs the kernel to leave a source-specific multicast channel at the * given `sourceAddress` and `groupAddress` using the * `IP_DROP_SOURCE_MEMBERSHIP` socket option. This method is automatically * called by the kernel when the socket is closed or the process terminates, * so most apps will never have reason to call this. * * If `multicastInterface` is not specified, the operating system will * attempt to drop membership on all valid interfaces. */ dropSourceSpecificMembership( sourceAddress: string, groupAddress: string, interfaceAddress?: string, ) { healthCheck(this); validateString(sourceAddress, "sourceAddress"); validateString(groupAddress, "groupAddress"); const err = this[kStateSymbol].handle!.dropSourceSpecificMembership( sourceAddress, groupAddress, interfaceAddress, ); if (err) { throw errnoException(err, "dropSourceSpecificMembership"); } } /** * This method throws `ERR_SOCKET_BUFFER_SIZE` if called on an unbound * socket. * * @return the `SO_RCVBUF` socket receive buffer size in bytes. */ getRecvBufferSize(): number { return bufferSize(this, 0, RECV_BUFFER); } /** * This method throws `ERR_SOCKET_BUFFER_SIZE` if called on an unbound * socket. * * @return the `SO_SNDBUF` socket send buffer size in bytes. */ getSendBufferSize(): number { return bufferSize(this, 0, SEND_BUFFER); } /** * By default, binding a socket will cause it to block the Node.js process * from exiting as long as the socket is open. The `socket.unref()` method * can be used to exclude the socket from the reference counting that keeps * the Node.js process active. The `socket.ref()` method adds the socket back * to the reference counting and restores the default behavior. * * Calling `socket.ref()` multiples times will have no additional effect. * * The `socket.ref()` method returns a reference to the socket so calls can * be chained. */ ref(): this { const handle = this[kStateSymbol].handle; if (handle) { handle.ref(); } return this; } /** * Returns an object containing the `address`, `family`, and `port` of the * remote endpoint. This method throws an `ERR_SOCKET_DGRAM_NOT_CONNECTED` * exception if the socket is not connected. */ remoteAddress(): AddressInfo { healthCheck(this); const state = this[kStateSymbol]; if (state.connectState !== CONNECT_STATE_CONNECTED) { throw new ERR_SOCKET_DGRAM_NOT_CONNECTED(); } const out = {}; const err = state.handle!.getpeername(out); if (err) { throw errnoException(err, "getpeername"); } return out as AddressInfo; } /** * Broadcasts a datagram on the socket. * For connectionless sockets, the destination `port` and `address` must be * specified. Connected sockets, on the other hand, will use their associated * remote endpoint, so the `port` and `address` arguments must not be set. * * The `msg` argument contains the message to be sent. * Depending on its type, different behavior can apply. If `msg` is a * `Buffer`, any `TypedArray` or a `DataView`, * the `offset` and `length` specify the offset within the `Buffer` where the * message begins and the number of bytes in the message, respectively. * If `msg` is a `String`, then it is automatically converted to a `Buffer` * with `'utf8'` encoding. With messages that contain multi-byte characters, * `offset` and `length` will be calculated with respect to `byte length` and * not the character position. If `msg` is an array, `offset` and `length` * must not be specified. * * The `address` argument is a string. If the value of `address` is a host * name, DNS will be used to resolve the address of the host. If `address` * is not provided or otherwise nullish, `'127.0.0.1'` (for `udp4` sockets) * or `'::1'`(for `udp6` sockets) will be used by default. * * If the socket has not been previously bound with a call to `bind`, the * socket is assigned a random port number and is bound to the "all * interfaces" address (`'0.0.0.0'` for `udp4` sockets, `'::0'` for `udp6` * sockets.) * * An optional `callback` function may be specified to as a way of * reporting DNS errors or for determining when it is safe to reuse the `buf` * object. DNS lookups delay the time to send for at least one tick of the * Node.js event loop. * * The only way to know for sure that the datagram has been sent is by using * a `callback`. If an error occurs and a `callback` is given, the error will * be passed as the first argument to the `callback`. If a `callback` is not * given, the error is emitted as an `'error'` event on the `socket` object. * * Offset and length are optional but both _must_ be set if either are used. * They are supported only when the first argument is a `Buffer`, a * `TypedArray`, or a `DataView`. * * This method throws `ERR_SOCKET_BAD_PORT` if called on an unbound socket. * * Example of sending a UDP packet to a port on `localhost`; * * ```js * import dgram from "internal:deno_node/polyfills/dgram"; * import { Buffer } from "internal:deno_node/polyfills/buffer"; * * const message = Buffer.from('Some bytes'); * const client = dgram.createSocket('udp4'); * client.send(message, 41234, 'localhost', (err) => { * client.close(); * }); * ``` * * Example of sending a UDP packet composed of multiple buffers to a port on * `127.0.0.1`; * * ```js * import dgram from "internal:deno_node/polyfills/dgram"; * import { Buffer } from "internal:deno_node/polyfills/buffer"; * * const buf1 = Buffer.from('Some '); * const buf2 = Buffer.from('bytes'); * const client = dgram.createSocket('udp4'); * client.send([buf1, buf2], 41234, (err) => { * client.close(); * }); * ``` * * Sending multiple buffers might be faster or slower depending on the * application and operating system. Run benchmarks to * determine the optimal strategy on a case-by-case basis. Generally * speaking, however, sending multiple buffers is faster. * * Example of sending a UDP packet using a socket connected to a port on * `localhost`: * * ```js * import dgram from "internal:deno_node/polyfills/dgram"; * import { Buffer } from "internal:deno_node/polyfills/buffer"; * * const message = Buffer.from('Some bytes'); * const client = dgram.createSocket('udp4'); * client.connect(41234, 'localhost', (err) => { * client.send(message, (err) => { * client.close(); * }); * }); * ``` * * @param msg Message to be sent. * @param offset Offset in the buffer where the message starts. * @param length Number of bytes in the message. * @param port Destination port. * @param address Destination host name or IP address. * @param callback Called when the message has been sent. */ send( msg: MessageType | ReadonlyArray, port?: number, address?: string, callback?: (error: ErrnoException | null, bytes?: number) => void, ): void; send( msg: MessageType | ReadonlyArray, port?: number, callback?: (error: ErrnoException | null, bytes?: number) => void, ): void; send( msg: MessageType | ReadonlyArray, callback?: (error: ErrnoException | null, bytes?: number) => void, ): void; send( msg: MessageType, offset: number, length: number, port?: number, address?: string, callback?: (error: ErrnoException | null, bytes?: number) => void, ): void; send( msg: MessageType, offset: number, length: number, port?: number, callback?: (error: ErrnoException | null, bytes?: number) => void, ): void; send( msg: MessageType, offset: number, length: number, callback?: (error: ErrnoException | null, bytes?: number) => void, ): void; send( buffer: unknown, offset?: unknown, length?: unknown, port?: unknown, address?: unknown, callback?: unknown, ) { let list: MessageType[] | null; const state = this[kStateSymbol]; const connected = state.connectState === CONNECT_STATE_CONNECTED; if (!connected) { if (address || (port && typeof port !== "function")) { buffer = sliceBuffer( buffer as MessageType, offset as number, length as number, ); } else { callback = port; port = offset; address = length; } } else { if (typeof length === "number") { buffer = sliceBuffer(buffer as MessageType, offset as number, length); if (typeof port === "function") { callback = port; port = null; } } else { callback = offset; } if (port || address) { throw new ERR_SOCKET_DGRAM_IS_CONNECTED(); } } if (!Array.isArray(buffer)) { if (typeof buffer === "string") { list = [Buffer.from(buffer)]; } else if (!isArrayBufferView(buffer)) { throw new ERR_INVALID_ARG_TYPE( "buffer", ["Buffer", "TypedArray", "DataView", "string"], buffer, ); } else { list = [buffer as MessageType]; } } else if (!(list = fixBufferList(buffer))) { throw new ERR_INVALID_ARG_TYPE( "buffer list arguments", ["Buffer", "TypedArray", "DataView", "string"], buffer, ); } if (!connected) { port = validatePort(port, "Port", false); } // Normalize callback so it's either a function or undefined but not anything // else. if (typeof callback !== "function") { callback = undefined; } if (typeof address === "function") { callback = address; address = undefined; } else if (address && typeof address !== "string") { throw new ERR_INVALID_ARG_TYPE("address", ["string", "falsy"], address); } healthCheck(this); if (state.bindState === BIND_STATE_UNBOUND) { this.bind({ port: 0, exclusive: true }); } if (list.length === 0) { list.push(Buffer.alloc(0)); } // If the socket hasn't been bound yet, push the outbound packet onto the // send queue and send after binding is complete. if (state.bindState !== BIND_STATE_BOUND) { // @ts-ignore mapping unknowns back onto themselves doesn't type nicely enqueue(this, this.send.bind(this, list, port, address, callback)); return; } const afterDns = (ex: ErrnoException | null, ip: string) => { defaultTriggerAsyncIdScope( this[asyncIdSymbol], doSend, ex, this, ip, list, address, port, callback, ); }; if (!connected) { state.handle!.lookup(address as string, afterDns); } else { afterDns(null, ""); } } /** * Sets or clears the `SO_BROADCAST` socket option. When set to `true`, UDP * packets may be sent to a local interface's broadcast address. * * This method throws `EBADF` if called on an unbound socket. */ setBroadcast(arg: boolean) { const err = this[kStateSymbol].handle!.setBroadcast(arg ? 1 : 0); if (err) { throw errnoException(err, "setBroadcast"); } } /** * _All references to scope in this section are referring to [IPv6 Zone Indices](https://en.wikipedia.org/wiki/IPv6_address#Scoped_literal_IPv6_addresses), which are defined by [RFC * 4007](https://tools.ietf.org/html/rfc4007). In string form, an IP_ * _with a scope index is written as `'IP%scope'` where scope is an interface name_ * _or interface number._ * * Sets the default outgoing multicast interface of the socket to a chosen * interface or back to system interface selection. The `multicastInterface` must * be a valid string representation of an IP from the socket's family. * * For IPv4 sockets, this should be the IP configured for the desired physical * interface. All packets sent to multicast on the socket will be sent on the * interface determined by the most recent successful use of this call. * * For IPv6 sockets, `multicastInterface` should include a scope to indicate the * interface as in the examples that follow. In IPv6, individual `send` calls can * also use explicit scope in addresses, so only packets sent to a multicast * address without specifying an explicit scope are affected by the most recent * successful use of this call. * * This method throws `EBADF` if called on an unbound socket. * * #### Example: IPv6 outgoing multicast interface * * On most systems, where scope format uses the interface name: * * ```js * const socket = dgram.createSocket('udp6'); * * socket.bind(1234, () => { * socket.setMulticastInterface('::%eth1'); * }); * ``` * * On Windows, where scope format uses an interface number: * * ```js * const socket = dgram.createSocket('udp6'); * * socket.bind(1234, () => { * socket.setMulticastInterface('::%2'); * }); * ``` * * #### Example: IPv4 outgoing multicast interface * * All systems use an IP of the host on the desired physical interface: * * ```js * const socket = dgram.createSocket('udp4'); * * socket.bind(1234, () => { * socket.setMulticastInterface('10.0.0.2'); * }); * ``` */ setMulticastInterface(interfaceAddress: string) { healthCheck(this); validateString(interfaceAddress, "interfaceAddress"); const err = this[kStateSymbol].handle!.setMulticastInterface( interfaceAddress, ); if (err) { throw errnoException(err, "setMulticastInterface"); } } /** * Sets or clears the `IP_MULTICAST_LOOP` socket option. When set to `true`, * multicast packets will also be received on the local interface. * * This method throws `EBADF` if called on an unbound socket. */ setMulticastLoopback(arg: boolean): typeof arg { const err = this[kStateSymbol].handle!.setMulticastLoopback(arg ? 1 : 0); if (err) { throw errnoException(err, "setMulticastLoopback"); } return arg; // 0.4 compatibility } /** * Sets the `IP_MULTICAST_TTL` socket option. While TTL generally stands for * "Time to Live", in this context it specifies the number of IP hops that a * packet is allowed to travel through, specifically for multicast traffic. Each * router or gateway that forwards a packet decrements the TTL. If the TTL is * decremented to 0 by a router, it will not be forwarded. * * The `ttl` argument may be between 0 and 255\. The default on most systems is `1`. * * This method throws `EBADF` if called on an unbound socket. */ setMulticastTTL(ttl: number): typeof ttl { validateNumber(ttl, "ttl"); const err = this[kStateSymbol].handle!.setMulticastTTL(ttl); if (err) { throw errnoException(err, "setMulticastTTL"); } return ttl; } /** * Sets the `SO_RCVBUF` socket option. Sets the maximum socket receive buffer * in bytes. * * This method throws `ERR_SOCKET_BUFFER_SIZE` if called on an unbound socket. */ setRecvBufferSize(size: number) { bufferSize(this, size, RECV_BUFFER); } /** * Sets the `SO_SNDBUF` socket option. Sets the maximum socket send buffer * in bytes. * * This method throws `ERR_SOCKET_BUFFER_SIZE` if called on an unbound socket. */ setSendBufferSize(size: number) { bufferSize(this, size, SEND_BUFFER); } /** * Sets the `IP_TTL` socket option. While TTL generally stands for "Time to Live", * in this context it specifies the number of IP hops that a packet is allowed to * travel through. Each router or gateway that forwards a packet decrements the * TTL. If the TTL is decremented to 0 by a router, it will not be forwarded. * Changing TTL values is typically done for network probes or when multicasting. * * The `ttl` argument may be between between 1 and 255\. The default on most systems * is 64. * * This method throws `EBADF` if called on an unbound socket. */ setTTL(ttl: number): typeof ttl { validateNumber(ttl, "ttl"); const err = this[kStateSymbol].handle!.setTTL(ttl); if (err) { throw errnoException(err, "setTTL"); } return ttl; } /** * By default, binding a socket will cause it to block the Node.js process from * exiting as long as the socket is open. The `socket.unref()` method can be used * to exclude the socket from the reference counting that keeps the Node.js * process active, allowing the process to exit even if the socket is still * listening. * * Calling `socket.unref()` multiple times will have no addition effect. * * The `socket.unref()` method returns a reference to the socket so calls can be * chained. */ unref(): this { const handle = this[kStateSymbol].handle; if (handle) { handle.unref(); } return this; } } /** * Creates a `dgram.Socket` object. Once the socket is created, calling * `socket.bind()` will instruct the socket to begin listening for datagram * messages. When `address` and `port` are not passed to `socket.bind()` the * method will bind the socket to the "all interfaces" address on a random port * (it does the right thing for both `udp4` and `udp6` sockets). The bound * address and port can be retrieved using `socket.address().address` and * `socket.address().port`. * * If the `signal` option is enabled, calling `.abort()` on the corresponding * `AbortController` is similar to calling `.close()` on the socket: * * ```js * const controller = new AbortController(); * const { signal } = controller; * const server = dgram.createSocket({ type: 'udp4', signal }); * server.on('message', (msg, rinfo) => { * console.log(`server got: ${msg} from ${rinfo.address}:${rinfo.port}`); * }); * // Later, when you want to close the server. * controller.abort(); * ``` * * @param options * @param callback Attached as a listener for `'message'` events. Optional. */ export function createSocket( type: SocketType, listener?: (msg: Buffer, rinfo: RemoteInfo) => void, ): Socket; export function createSocket( type: SocketOptions, listener?: (msg: Buffer, rinfo: RemoteInfo) => void, ): Socket; export function createSocket( type: SocketType | SocketOptions, listener?: (msg: Buffer, rinfo: RemoteInfo) => void, ): Socket { return new Socket(type, listener); } function startListening(socket: Socket) { const state = socket[kStateSymbol]; state.handle!.onmessage = onMessage; // Todo(@bartlomieju): handle errors state.handle!.recvStart(); state.receiving = true; state.bindState = BIND_STATE_BOUND; if (state.recvBufferSize) { bufferSize(socket, state.recvBufferSize, RECV_BUFFER); } if (state.sendBufferSize) { bufferSize(socket, state.sendBufferSize, SEND_BUFFER); } socket.emit("listening"); } function replaceHandle(self: Socket, newHandle: UDP) { const state = self[kStateSymbol]; const oldHandle = state.handle!; // Set up the handle that we got from primary. newHandle.lookup = oldHandle.lookup; newHandle.bind = oldHandle.bind; newHandle.send = oldHandle.send; newHandle[ownerSymbol] = self; // Replace the existing handle by the handle we got from primary. oldHandle.close(); state.handle = newHandle; } function bufferSize(self: Socket, size: number, buffer: boolean): number { if (size >>> 0 !== size) { throw new ERR_SOCKET_BAD_BUFFER_SIZE(); } const ctx = {}; const ret = self[kStateSymbol].handle!.bufferSize(size, buffer, ctx); if (ret === undefined) { throw new ERR_SOCKET_BUFFER_SIZE(ctx as NodeSystemErrorCtx); } return ret; } function socketCloseNT(self: Socket) { self.emit("close"); } function healthCheck(socket: Socket) { if (!socket[kStateSymbol].handle) { // Error message from dgram_legacy.js. throw new ERR_SOCKET_DGRAM_NOT_RUNNING(); } } function stopReceiving(socket: Socket) { const state = socket[kStateSymbol]; if (!state.receiving) { return; } state.handle!.recvStop(); state.receiving = false; } function onMessage( nread: number, handle: UDP, buf?: Buffer, rinfo?: RemoteInfo, ) { const self = handle[ownerSymbol] as Socket; if (nread < 0) { self.emit("error", errnoException(nread, "recvmsg")); return; } rinfo!.size = buf!.length; // compatibility self.emit("message", buf, rinfo); } function sliceBuffer(buffer: MessageType, offset: number, length: number) { if (typeof buffer === "string") { buffer = Buffer.from(buffer); } else if (!isArrayBufferView(buffer)) { throw new ERR_INVALID_ARG_TYPE( "buffer", ["Buffer", "TypedArray", "DataView", "string"], buffer, ); } offset = offset >>> 0; length = length >>> 0; if (offset > buffer.byteLength) { throw new ERR_BUFFER_OUT_OF_BOUNDS("offset"); } if (offset + length > buffer.byteLength) { throw new ERR_BUFFER_OUT_OF_BOUNDS("length"); } return Buffer.from(buffer.buffer, buffer.byteOffset + offset, length); } function fixBufferList( list: ReadonlyArray, ): Array | null { const newList = new Array(list.length); for (let i = 0, l = list.length; i < l; i++) { const buf = list[i]; if (typeof buf === "string") { newList[i] = Buffer.from(buf); } else if (!isArrayBufferView(buf)) { return null; } else { newList[i] = Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength); } } return newList; } function enqueue(self: Socket, toEnqueue: () => void) { const state = self[kStateSymbol]; // If the send queue hasn't been initialized yet, do it, and install an // event handler that flushes the send queue after binding is done. if (state.queue === undefined) { state.queue = []; self.once(EventEmitter.errorMonitor, onListenError); self.once("listening", onListenSuccess); } state.queue.push(toEnqueue); } function onListenSuccess(this: Socket) { this.removeListener(EventEmitter.errorMonitor, onListenError); clearQueue.call(this); } function onListenError(this: Socket) { this.removeListener("listening", onListenSuccess); this[kStateSymbol].queue = undefined; } function clearQueue(this: Socket) { const state = this[kStateSymbol]; const queue = state.queue; state.queue = undefined; // Flush the send queue. for (const queueEntry of queue!) { queueEntry(); } } function _connect( this: Socket, port: number, address: string, callback: (err?: ErrnoException) => void, ) { const state = this[kStateSymbol]; if (callback) { this.once("connect", callback); } const afterDns = (ex: ErrnoException | null, ip: string) => { defaultTriggerAsyncIdScope( this[asyncIdSymbol], doConnect, ex, this, ip, address, port, callback, ); }; state.handle!.lookup(address, afterDns); } function doConnect( ex: ErrnoException | null, self: Socket, ip: string, address: string, port: number, callback: (err?: ErrnoException) => void, ) { const state = self[kStateSymbol]; if (!state.handle) { return; } if (!ex) { const err = state.handle.connect(ip, port); if (err) { ex = exceptionWithHostPort(err, "connect", address, port); } } if (ex) { state.connectState = CONNECT_STATE_DISCONNECTED; return nextTick(() => { if (callback) { self.removeListener("connect", callback); callback(ex!); } else { self.emit("error", ex); } }); } state.connectState = CONNECT_STATE_CONNECTED; nextTick(() => self.emit("connect")); } function doSend( ex: ErrnoException | null, self: Socket, ip: string, list: MessageType[], address: string, port: number, callback?: (error: ErrnoException | null, bytes?: number) => void, ) { const state = self[kStateSymbol]; if (ex) { if (typeof callback === "function") { nextTick(callback, ex); return; } nextTick(() => self.emit("error", ex)); return; } else if (!state.handle) { return; } const req = new SendWrap(); req.list = list; // Keep reference alive. req.address = address; req.port = port; if (callback) { req.callback = callback; req.oncomplete = afterSend; } let err; if (port) { err = state.handle.send(req, list, list.length, port, ip, !!callback); } else { err = state.handle.send(req, list, list.length, !!callback); } if (err >= 1) { // Synchronous finish. The return code is msg_length + 1 so that we can // distinguish between synchronous success and asynchronous success. if (callback) { nextTick(callback, null, err - 1); } return; } if (err && callback) { // Don't emit as error, dgram_legacy.js compatibility const ex = exceptionWithHostPort(err, "send", address, port); nextTick(callback, ex); } } function afterSend(this: SendWrap, err: number | null, sent?: number) { let ex: ErrnoException | null; if (err) { ex = exceptionWithHostPort(err, "send", this.address, this.port); } else { ex = null; } this.callback(ex, sent); } export type { SocketType }; export default { createSocket, Socket, };