From 4c34a2f2df595fa43b3eb06722c3ad742450d8bd Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Mon, 20 Mar 2023 21:27:00 +0000 Subject: [PATCH] feat(ext/net): Add multicasting APIs to DatagramConn (#10706) (#17811) --- cli/tests/unit/net_test.ts | 135 ++++++++++++++++++++++++ cli/tsc/dts/lib.deno.unstable.d.ts | 49 ++++++++- ext/net/01_net.js | 59 +++++++++++ ext/net/lib.rs | 6 ++ ext/net/ops.rs | 164 ++++++++++++++++++++++++++++- 5 files changed, 409 insertions(+), 4 deletions(-) diff --git a/cli/tests/unit/net_test.ts b/cli/tests/unit/net_test.ts index 652d07d7de..935a6f846b 100644 --- a/cli/tests/unit/net_test.ts +++ b/cli/tests/unit/net_test.ts @@ -425,6 +425,141 @@ Deno.test( }, ); +Deno.test( + { permissions: { net: true }, ignore: true }, + async function netUdpMulticastV4() { + const listener = Deno.listenDatagram({ + hostname: "0.0.0.0", + port: 5353, + transport: "udp", + reuseAddress: true, + }); + + const membership = await listener.joinMulticastV4( + "224.0.0.251", + "127.0.0.1", + ); + + membership.setLoopback(true); + membership.setLoopback(false); + membership.setTTL(50); + membership.leave(); + listener.close(); + }, +); + +Deno.test( + { permissions: { net: true }, ignore: true }, + async function netUdpMulticastV6() { + const listener = Deno.listenDatagram({ + hostname: "::", + port: 5353, + transport: "udp", + reuseAddress: true, + }); + + const membership = await listener.joinMulticastV6( + "ff02::fb", + 1, + ); + + membership.setLoopback(true); + membership.setLoopback(false); + membership.leave(); + listener.close(); + }, +); + +Deno.test( + { permissions: { net: true }, ignore: true }, + async function netUdpSendReceiveMulticastv4() { + const alice = Deno.listenDatagram({ + hostname: "0.0.0.0", + port: 5353, + transport: "udp", + reuseAddress: true, + loopback: true, + }); + + const bob = Deno.listenDatagram({ + hostname: "0.0.0.0", + port: 5353, + transport: "udp", + reuseAddress: true, + }); + + const aliceMembership = await alice.joinMulticastV4( + "224.0.0.1", + "0.0.0.0", + ); + + const bobMembership = await bob.joinMulticastV4("224.0.0.1", "0.0.0.0"); + + const sent = new Uint8Array([1, 2, 3]); + + await alice.send(sent, { + hostname: "224.0.0.1", + port: 5353, + transport: "udp", + }); + + const [recvd, remote] = await bob.receive(); + + assert(remote.transport === "udp"); + assertEquals(remote.port, 5353); + assertEquals(recvd.length, 3); + assertEquals(1, recvd[0]); + assertEquals(2, recvd[1]); + assertEquals(3, recvd[2]); + + aliceMembership.leave(); + bobMembership.leave(); + + alice.close(); + bob.close(); + }, +); + +Deno.test( + { permissions: { net: true }, ignore: true }, + async function netUdpMulticastLoopbackOption() { + // Must bind sender to an address that can send to the broadcast address on MacOS. + // Macos will give us error 49 when sending the broadcast packet if we omit hostname here. + const listener = Deno.listenDatagram({ + port: 5353, + transport: "udp", + hostname: "0.0.0.0", + loopback: true, + reuseAddress: true, + }); + + const membership = await listener.joinMulticastV4( + "224.0.0.1", + "0.0.0.0", + ); + + // await membership.setLoopback(true); + + const sent = new Uint8Array([1, 2, 3]); + const byteLength = await listener.send(sent, { + hostname: "224.0.0.1", + port: 5353, + transport: "udp", + }); + + assertEquals(byteLength, 3); + const [recvd, remote] = await listener.receive(); + assert(remote.transport === "udp"); + assertEquals(remote.port, 5353); + assertEquals(recvd.length, 3); + assertEquals(1, recvd[0]); + assertEquals(2, recvd[1]); + assertEquals(3, recvd[2]); + membership.leave(); + listener.close(); + }, +); + Deno.test( { permissions: { net: true } }, async function netUdpConcurrentSendReceive() { diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index ed7e682f12..62426ca35c 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -163,7 +163,7 @@ declare namespace Deno { */ type ToNativeResultType = T extends NativeStructType ? BufferSource - : ToNativeResultTypeMap[Exclude]; + : ToNativeResultTypeMap[Exclude]; /** **UNSTABLE**: New API, yet to be vetted. * @@ -225,7 +225,7 @@ declare namespace Deno { */ type FromNativeResultType = T extends NativeStructType ? Uint8Array - : FromNativeResultTypeMap[Exclude]; + : FromNativeResultTypeMap[Exclude]; /** **UNSTABLE**: New API, yet to be vetted. * @@ -850,6 +850,34 @@ declare namespace Deno { options: CreateHttpClientOptions, ): HttpClient; + /** **UNSTABLE**: New API, yet to be vetted. + * + * Represents membership of a IPv4 multicast group. + * + * @category Network + */ + interface MulticastV4Membership { + /** Leaves the multicast group. */ + leave: () => Promise; + /** Sets the multicast loopback option. If enabled, multicast packets will be looped back to the local socket. */ + setLoopback: (loopback: boolean) => Promise; + /** Sets the time-to-live of outgoing multicast packets for this socket. */ + setTTL: (ttl: number) => Promise; + } + + /** **UNSTABLE**: New API, yet to be vetted. + * + * Represents membership of a IPv6 multicast group. + * + * @category Network + */ + interface MulticastV6Membership { + /** Leaves the multicast group. */ + leave: () => Promise; + /** Sets the multicast loopback option. If enabled, multicast packets will be looped back to the local socket. */ + setLoopback: (loopback: boolean) => Promise; + } + /** **UNSTABLE**: New API, yet to be vetted. * * A generic transport listener for message-oriented protocols. @@ -857,6 +885,18 @@ declare namespace Deno { * @category Network */ export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> { + /** Joins an IPv4 multicast group. */ + joinMulticastV4( + address: string, + networkInterface: string, + ): Promise; + + /** Joins an IPv6 multicast group. */ + joinMulticastV6( + address: string, + networkInterface: number, + ): Promise; + /** Waits for and resolves to the next message to the instance. * * Messages are received in the format of a tuple containing the data array @@ -918,6 +958,11 @@ declare namespace Deno { * * @default {false} */ reuseAddress?: boolean; + + /** When `true`, sent multicast packets will be looped back to the local socket. + * + * @default {false} */ + loopback?: boolean; } /** **UNSTABLE**: New API, yet to be vetted. diff --git a/ext/net/01_net.js b/ext/net/01_net.js index 8d8e34e569..81e13f0945 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -277,6 +277,64 @@ class Datagram { return this.#addr; } + async joinMulticastV4(addr, multiInterface) { + await core.opAsync( + "op_net_join_multi_v4_udp", + this.rid, + addr, + multiInterface, + ); + + return { + leave: () => + core.opAsync( + "op_net_leave_multi_v4_udp", + this.rid, + addr, + multiInterface, + ), + setLoopback: (loopback) => + core.opAsync( + "op_net_set_multi_loopback_udp", + this.rid, + true, + loopback, + ), + setTTL: (ttl) => + core.opAsync( + "op_net_set_multi_ttl_udp", + this.rid, + ttl, + ), + }; + } + + async joinMulticastV6(addr, multiInterface) { + await core.opAsync( + "op_net_join_multi_v6_udp", + this.rid, + addr, + multiInterface, + ); + + return { + leave: () => + core.opAsync( + "op_net_leave_multi_v6_udp", + this.rid, + addr, + multiInterface, + ), + setLoopback: (loopback) => + core.opAsync( + "op_net_set_multi_loopback_udp", + this.rid, + false, + loopback, + ), + }; + } + async receive(p) { const buf = p || new Uint8Array(this.bufSize); let nread; @@ -383,6 +441,7 @@ function createListenDatagram(udpOpFn, unixOpFn) { port: args.port, }, args.reuseAddress ?? false, + args.loopback ?? false, ); addr.transport = "udp"; return new Datagram(rid, addr); diff --git a/ext/net/lib.rs b/ext/net/lib.rs index 00833b53c4..f812bf60bc 100644 --- a/ext/net/lib.rs +++ b/ext/net/lib.rs @@ -86,6 +86,12 @@ deno_core::extension!(deno_net, ops::op_node_unstable_net_listen_udp

, ops::op_net_recv_udp, ops::op_net_send_udp

, + ops::op_net_join_multi_v4_udp

, + ops::op_net_join_multi_v6_udp

, + ops::op_net_leave_multi_v4_udp

, + ops::op_net_leave_multi_v6_udp

, + ops::op_net_set_multi_loopback_udp

, + ops::op_net_set_multi_ttl_udp

, ops::op_dns_resolve

, ops::op_set_nodelay, ops::op_set_keepalive, diff --git a/ext/net/ops.rs b/ext/net/ops.rs index c094ddac2b..8e7263753a 100644 --- a/ext/net/ops.rs +++ b/ext/net/ops.rs @@ -28,8 +28,11 @@ use socket2::Socket; use socket2::Type; use std::borrow::Cow; use std::cell::RefCell; +use std::net::Ipv4Addr; +use std::net::Ipv6Addr; use std::net::SocketAddr; use std::rc::Rc; +use std::str::FromStr; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::net::UdpSocket; @@ -155,6 +158,151 @@ where Ok(nwritten) } +#[op] +async fn op_net_join_multi_v4_udp( + state: Rc>, + rid: ResourceId, + address: String, + multi_interface: String, +) -> Result<(), AnyError> +where + NP: NetPermissions + 'static, +{ + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + + let addr = Ipv4Addr::from_str(address.as_str())?; + let interface_addr = Ipv4Addr::from_str(multi_interface.as_str())?; + + socket.join_multicast_v4(addr, interface_addr)?; + + Ok(()) +} + +#[op] +async fn op_net_join_multi_v6_udp( + state: Rc>, + rid: ResourceId, + address: String, + multi_interface: u32, +) -> Result<(), AnyError> +where + NP: NetPermissions + 'static, +{ + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + + let addr = Ipv6Addr::from_str(address.as_str())?; + + socket.join_multicast_v6(&addr, multi_interface)?; + + Ok(()) +} + +#[op] +async fn op_net_leave_multi_v4_udp( + state: Rc>, + rid: ResourceId, + address: String, + multi_interface: String, +) -> Result<(), AnyError> +where + NP: NetPermissions + 'static, +{ + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + + let addr = Ipv4Addr::from_str(address.as_str())?; + let interface_addr = Ipv4Addr::from_str(multi_interface.as_str())?; + + socket.leave_multicast_v4(addr, interface_addr)?; + + Ok(()) +} + +#[op] +async fn op_net_leave_multi_v6_udp( + state: Rc>, + rid: ResourceId, + address: String, + multi_interface: u32, +) -> Result<(), AnyError> +where + NP: NetPermissions + 'static, +{ + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + + let addr = Ipv6Addr::from_str(address.as_str())?; + + socket.leave_multicast_v6(&addr, multi_interface)?; + + Ok(()) +} + +#[op] +async fn op_net_set_multi_loopback_udp( + state: Rc>, + rid: ResourceId, + is_v4_membership: bool, + loopback: bool, +) -> Result<(), AnyError> +where + NP: NetPermissions + 'static, +{ + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + + if is_v4_membership { + socket.set_multicast_loop_v4(loopback)? + } else { + socket.set_multicast_loop_v6(loopback)?; + } + + Ok(()) +} + +#[op] +async fn op_net_set_multi_ttl_udp( + state: Rc>, + rid: ResourceId, + ttl: u32, +) -> Result<(), AnyError> +where + NP: NetPermissions + 'static, +{ + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .map_err(|_| bad_resource("Socket has been closed"))?; + let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; + + socket.set_multicast_ttl_v4(ttl)?; + + Ok(()) +} + #[op] pub async fn op_net_connect_tcp( state: Rc>, @@ -266,6 +414,7 @@ fn net_listen_udp( state: &mut OpState, addr: IpAddr, reuse_address: bool, + loopback: bool, ) -> Result<(ResourceId, IpAddr), AnyError> where NP: NetPermissions + 'static, @@ -301,9 +450,18 @@ where let socket_addr = socket2::SockAddr::from(addr); socket_tmp.bind(&socket_addr)?; socket_tmp.set_nonblocking(true)?; + // Enable messages to be sent to the broadcast address (255.255.255.255) by default socket_tmp.set_broadcast(true)?; + + if domain == Domain::IPV4 { + socket_tmp.set_multicast_loop_v4(loopback)?; + } else { + socket_tmp.set_multicast_loop_v6(loopback)?; + } + let std_socket: std::net::UdpSocket = socket_tmp.into(); + let socket = UdpSocket::from_std(std_socket)?; let local_addr = socket.local_addr()?; let socket_resource = UdpSocketResource { @@ -320,12 +478,13 @@ fn op_net_listen_udp( state: &mut OpState, addr: IpAddr, reuse_address: bool, + loopback: bool, ) -> Result<(ResourceId, IpAddr), AnyError> where NP: NetPermissions + 'static, { super::check_unstable(state, "Deno.listenDatagram"); - net_listen_udp::(state, addr, reuse_address) + net_listen_udp::(state, addr, reuse_address, loopback) } #[op] @@ -333,11 +492,12 @@ fn op_node_unstable_net_listen_udp( state: &mut OpState, addr: IpAddr, reuse_address: bool, + loopback: bool, ) -> Result<(ResourceId, IpAddr), AnyError> where NP: NetPermissions + 'static, { - net_listen_udp::(state, addr, reuse_address) + net_listen_udp::(state, addr, reuse_address, loopback) } #[derive(Serialize, Eq, PartialEq, Debug)]