diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 7f825fd41e..6055523c04 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -56,7 +56,7 @@ source-map-mappings = "0.5.0" sys-info = "0.5.8" tempfile = "3.1.0" termcolor = "1.0.5" -tokio = { version = "0.2", features = ["rt-core", "tcp", "process", "fs", "blocking", "sync", "io-std", "macros", "time"] } +tokio = { version = "0.2", features = ["rt-core", "tcp", "udp", "process", "fs", "blocking", "sync", "io-std", "macros", "time"] } tokio-rustls = "0.12.1" url = "2.1.0" utime = "0.2.1" diff --git a/cli/js/deno.ts b/cli/js/deno.ts index 8ccca90961..b86b28911f 100644 --- a/cli/js/deno.ts +++ b/cli/js/deno.ts @@ -73,8 +73,12 @@ export { export { metrics, Metrics } from "./metrics.ts"; export { mkdirSync, mkdir } from "./mkdir.ts"; export { + Addr, connect, listen, + recvfrom, + UDPConn, + UDPAddr, Listener, Conn, ShutdownMode, diff --git a/cli/js/dispatch.ts b/cli/js/dispatch.ts index 4322daa998..64a392ab9b 100644 --- a/cli/js/dispatch.ts +++ b/cli/js/dispatch.ts @@ -30,6 +30,8 @@ export let OP_REPL_START: number; export let OP_REPL_READLINE: number; export let OP_ACCEPT: number; export let OP_ACCEPT_TLS: number; +export let OP_RECEIVE: number; +export let OP_SEND: number; export let OP_CONNECT: number; export let OP_SHUTDOWN: number; export let OP_LISTEN: number; diff --git a/cli/js/lib.deno.ns.d.ts b/cli/js/lib.deno.ns.d.ts index fda2270a81..1839c813a4 100644 --- a/cli/js/lib.deno.ns.d.ts +++ b/cli/js/lib.deno.ns.d.ts @@ -1387,14 +1387,20 @@ declare namespace Deno { */ export function openPlugin(filename: string): Plugin; - type Transport = "tcp"; + export type Transport = "tcp" | "udp"; - interface Addr { + export interface Addr { transport: Transport; hostname: string; port: number; } + export interface UDPAddr { + transport?: Transport; + hostname?: string; + port: number; + } + /** UNSTABLE: Maybe remove ShutdownMode entirely. */ export enum ShutdownMode { // See http://man7.org/linux/man-pages/man2/shutdown.2.html @@ -1417,6 +1423,36 @@ declare namespace Deno { */ export function shutdown(rid: number, how: ShutdownMode): void; + /** UNSTABLE: new API + * Waits for the next message to the passed rid and writes it on the passed buffer. + * Returns the number of bytes written and the remote address. + */ + export function recvfrom(rid: number, p: Uint8Array): Promise<[number, Addr]>; + + /** UNSTABLE: new API + * A socket is a generic transport listener for message-oriented protocols + */ + export interface UDPConn extends AsyncIterator<[Uint8Array, Addr]> { + /** UNSTABLE: new API + * Waits for and resolves to the next message to the `Socket`. */ + receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>; + + /** UNSTABLE: new API + * Sends a message to the target. */ + send(p: Uint8Array, addr: UDPAddr): Promise; + + /** UNSTABLE: new API + * Close closes the socket. Any pending message promises will be rejected + * with errors. + */ + close(): void; + + /** Return the address of the `Socket`. */ + addr: Addr; + + [Symbol.asyncIterator](): AsyncIterator<[Uint8Array, Addr]>; + } + /** A Listener is a generic network listener for stream-oriented protocols. */ export interface Listener extends AsyncIterator { /** Waits for and resolves to the next connection to the `Listener`. */ @@ -1457,7 +1493,9 @@ declare namespace Deno { transport?: Transport; } - /** Listen announces on the local transport address. + /** UNSTABLE: new API + * + * Listen announces on the local transport address. * * Requires the allow-net permission. * @@ -1476,7 +1514,13 @@ declare namespace Deno { * listen({ hostname: "[2001:db8::1]", port: 80 }); * listen({ hostname: "golang.org", port: 80, transport: "tcp" }) */ - export function listen(options: ListenOptions): Listener; + export function listen( + options: ListenOptions & { transport?: "tcp" } + ): Listener; + export function listen( + options: ListenOptions & { transport: "udp" } + ): UDPConn; + export function listen(options: ListenOptions): Listener | UDPConn; export interface ListenTLSOptions { port: number; diff --git a/cli/js/net.ts b/cli/js/net.ts index a89468f02f..9d82a3a3fe 100644 --- a/cli/js/net.ts +++ b/cli/js/net.ts @@ -4,7 +4,7 @@ import { read, write, close } from "./files.ts"; import * as dispatch from "./dispatch.ts"; import { sendSync, sendAsync } from "./dispatch_json.ts"; -export type Transport = "tcp"; +export type Transport = "tcp" | "udp"; // TODO support other types: // export type Transport = "tcp" | "tcp4" | "tcp6" | "unix" | "unixpacket"; @@ -14,6 +14,31 @@ export interface Addr { port: number; } +export interface UDPAddr { + transport?: Transport; + hostname?: string; + port: number; +} + +/** A socket is a generic transport listener for message-oriented protocols */ +export interface UDPConn extends AsyncIterator<[Uint8Array, Addr]> { + /** Waits for and resolves to the next message to the `Socket`. */ + receive(p?: Uint8Array): Promise<[Uint8Array, Addr]>; + + /** Sends a message to the target. */ + send(p: Uint8Array, addr: UDPAddr): Promise; + + /** Close closes the socket. Any pending message promises will be rejected + * with errors. + */ + close(): void; + + /** Return the address of the `Socket`. */ + addr: Addr; + + [Symbol.asyncIterator](): AsyncIterator<[Uint8Array, Addr]>; +} + /** A Listener is a generic transport listener for stream-oriented protocols. */ export interface Listener extends AsyncIterator { /** Waits for and resolves to the next connection to the `Listener`. */ @@ -87,7 +112,7 @@ export class ConnImpl implements Conn { export class ListenerImpl implements Listener { constructor( readonly rid: number, - public addr: Addr, + readonly addr: Addr, private closing: boolean = false ) {} @@ -123,6 +148,63 @@ export class ListenerImpl implements Listener { } } +export async function recvfrom( + rid: number, + p: Uint8Array +): Promise<[number, Addr]> { + const { size, remoteAddr } = await sendAsync(dispatch.OP_RECEIVE, { rid }, p); + return [size, remoteAddr]; +} + +export class UDPConnImpl implements UDPConn { + constructor( + readonly rid: number, + readonly addr: Addr, + public bufSize: number = 1024, + private closing: boolean = false + ) {} + + async receive(p?: Uint8Array): Promise<[Uint8Array, Addr]> { + const buf = p || new Uint8Array(this.bufSize); + const [size, remoteAddr] = await recvfrom(this.rid, buf); + const sub = buf.subarray(0, size); + return [sub, remoteAddr]; + } + + async send(p: Uint8Array, addr: UDPAddr): Promise { + const remote = { hostname: "127.0.0.1", transport: "udp", ...addr }; + if (remote.transport !== "udp") throw Error("Remote transport must be UDP"); + const args = { ...remote, rid: this.rid }; + await sendAsync(dispatch.OP_SEND, args, p); + } + + close(): void { + this.closing = true; + close(this.rid); + } + + async next(): Promise> { + if (this.closing) { + return { value: undefined, done: true }; + } + return await this.receive() + .then(value => ({ value, done: false })) + .catch(e => { + // It wouldn't be correct to simply check this.closing here. + // TODO: Get a proper error kind for this case, don't check the message. + // The current error kind is Other. + if (e.message == "Socket has been closed") { + return { value: undefined, done: true }; + } + throw e; + }); + } + + [Symbol.asyncIterator](): AsyncIterator<[Uint8Array, Addr]> { + return this; + } +} + export interface Conn extends Reader, Writer, Closer { /** The local address of the connection. */ localAddr: Addr; @@ -146,14 +228,16 @@ export interface ListenOptions { transport?: Transport; } +const listenDefaults = { hostname: "0.0.0.0", transport: "tcp" }; + /** Listen announces on the local transport address. * * @param options * @param options.port The port to connect to. (Required.) * @param options.hostname A literal IP address or host name that can be * resolved to an IP address. If not specified, defaults to 0.0.0.0 - * @param options.transport Defaults to "tcp". Later we plan to add "tcp4", - * "tcp6", "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unix", "unixgram" and + * @param options.transport Must be "tcp" or "udp". Defaults to "tcp". Later we plan to add "tcp4", + * "tcp6", "udp4", "udp6", "ip", "ip4", "ip6", "unix", "unixgram" and * "unixpacket". * * Examples: @@ -163,16 +247,19 @@ export interface ListenOptions { * listen({ hostname: "[2001:db8::1]", port: 80 }); * listen({ hostname: "golang.org", port: 80, transport: "tcp" }) */ -export function listen(options: ListenOptions): Listener { - const hostname = options.hostname || "0.0.0.0"; - const transport = options.transport || "tcp"; +export function listen( + options: ListenOptions & { transport?: "tcp" } +): Listener; +export function listen(options: ListenOptions & { transport: "udp" }): UDPConn; +export function listen(options: ListenOptions): Listener | UDPConn { + const args = { ...listenDefaults, ...options }; + const res = sendSync(dispatch.OP_LISTEN, args); - const res = sendSync(dispatch.OP_LISTEN, { - hostname, - port: options.port, - transport - }); - return new ListenerImpl(res.rid, res.localAddr); + if (args.transport === "tcp") { + return new ListenerImpl(res.rid, res.localAddr); + } else { + return new UDPConnImpl(res.rid, res.localAddr); + } } export interface ConnectOptions { @@ -189,8 +276,8 @@ const connectDefaults = { hostname: "127.0.0.1", transport: "tcp" }; * @param options.port The port to connect to. (Required.) * @param options.hostname A literal IP address or host name that can be * resolved to an IP address. If not specified, defaults to 127.0.0.1 - * @param options.transport Defaults to "tcp". Later we plan to add "tcp4", - * "tcp6", "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unix", "unixgram" and + * @param options.transport Must be "tcp" or "udp". Defaults to "tcp". Later we plan to add "tcp4", + * "tcp6", "udp4", "udp6", "ip", "ip4", "ip6", "unix", "unixgram" and * "unixpacket". * * Examples: diff --git a/cli/js/net_test.ts b/cli/js/net_test.ts index a2f086f0a5..75bce2e52c 100644 --- a/cli/js/net_test.ts +++ b/cli/js/net_test.ts @@ -1,7 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. import { testPerm, assert, assertEquals } from "./test_util.ts"; -testPerm({ net: true }, function netListenClose(): void { +testPerm({ net: true }, function netTcpListenClose(): void { const listener = Deno.listen({ hostname: "127.0.0.1", port: 4500 }); assertEquals(listener.addr.transport, "tcp"); assertEquals(listener.addr.hostname, "127.0.0.1"); @@ -9,7 +9,21 @@ testPerm({ net: true }, function netListenClose(): void { listener.close(); }); -testPerm({ net: true }, async function netCloseWhileAccept(): Promise { +testPerm({ net: true }, function netUdpListenClose(): void { + if (Deno.build.os === "win") return; // TODO + + const socket = Deno.listen({ + hostname: "127.0.0.1", + port: 4500, + transport: "udp" + }); + assertEquals(socket.addr.transport, "udp"); + assertEquals(socket.addr.hostname, "127.0.0.1"); + assertEquals(socket.addr.port, 4500); + socket.close(); +}); + +testPerm({ net: true }, async function netTcpCloseWhileAccept(): Promise { const listener = Deno.listen({ port: 4501 }); const p = listener.accept(); listener.close(); @@ -24,7 +38,7 @@ testPerm({ net: true }, async function netCloseWhileAccept(): Promise { assertEquals(err.message, "Listener has been closed"); }); -testPerm({ net: true }, async function netConcurrentAccept(): Promise { +testPerm({ net: true }, async function netTcpConcurrentAccept(): Promise { const listener = Deno.listen({ port: 4502 }); let acceptErrCount = 0; const checkErr = (e: Error): void => { @@ -44,7 +58,7 @@ testPerm({ net: true }, async function netConcurrentAccept(): Promise { assertEquals(acceptErrCount, 1); }); -testPerm({ net: true }, async function netDialListen(): Promise { +testPerm({ net: true }, async function netTcpDialListen(): Promise { const listener = Deno.listen({ port: 4500 }); listener.accept().then( async (conn): Promise => { @@ -76,18 +90,58 @@ testPerm({ net: true }, async function netDialListen(): Promise { conn.close(); }); -testPerm({ net: true }, async function netListenCloseWhileIterating(): Promise< - void -> { - const listener = Deno.listen({ port: 8000 }); - const nextWhileClosing = listener[Symbol.asyncIterator]().next(); - listener.close(); - assertEquals(await nextWhileClosing, { value: undefined, done: true }); +testPerm({ net: true }, async function netUdpSendReceive(): Promise { + if (Deno.build.os === "win") return; // TODO - const nextAfterClosing = listener[Symbol.asyncIterator]().next(); - assertEquals(await nextAfterClosing, { value: undefined, done: true }); + const alice = Deno.listen({ port: 4500, transport: "udp" }); + assertEquals(alice.addr.port, 4500); + assertEquals(alice.addr.hostname, "0.0.0.0"); + assertEquals(alice.addr.transport, "udp"); + + const bob = Deno.listen({ port: 4501, transport: "udp" }); + assertEquals(bob.addr.port, 4501); + assertEquals(bob.addr.hostname, "0.0.0.0"); + assertEquals(bob.addr.transport, "udp"); + + const sent = new Uint8Array([1, 2, 3]); + await alice.send(sent, bob.addr); + + const [recvd, remote] = await bob.receive(); + assertEquals(remote.port, 4500); + assertEquals(recvd.length, 3); + assertEquals(1, recvd[0]); + assertEquals(2, recvd[1]); + assertEquals(3, recvd[2]); }); +testPerm( + { net: true }, + async function netTcpListenCloseWhileIterating(): Promise { + const listener = Deno.listen({ port: 8000 }); + const nextWhileClosing = listener[Symbol.asyncIterator]().next(); + listener.close(); + assertEquals(await nextWhileClosing, { value: undefined, done: true }); + + const nextAfterClosing = listener[Symbol.asyncIterator]().next(); + assertEquals(await nextAfterClosing, { value: undefined, done: true }); + } +); + +testPerm( + { net: true }, + async function netUdpListenCloseWhileIterating(): Promise { + if (Deno.build.os === "win") return; // TODO + + const socket = Deno.listen({ port: 8000, transport: "udp" }); + const nextWhileClosing = socket[Symbol.asyncIterator]().next(); + socket.close(); + assertEquals(await nextWhileClosing, { value: undefined, done: true }); + + const nextAfterClosing = socket[Symbol.asyncIterator]().next(); + assertEquals(await nextAfterClosing, { value: undefined, done: true }); + } +); + /* TODO(ry) Re-enable this test. testPerm({ net: true }, async function netListenAsyncIterator(): Promise { const listener = Deno.listen(":4500"); diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 569aebca03..c8fd5d398d 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -18,12 +18,15 @@ use std::task::Poll; use tokio; use tokio::net::TcpListener; use tokio::net::TcpStream; +use tokio::net::UdpSocket; pub fn init(i: &mut Isolate, s: &State) { i.register_op("accept", s.core_op(json_op(s.stateful_op(op_accept)))); i.register_op("connect", s.core_op(json_op(s.stateful_op(op_connect)))); i.register_op("shutdown", s.core_op(json_op(s.stateful_op(op_shutdown)))); i.register_op("listen", s.core_op(json_op(s.stateful_op(op_listen)))); + i.register_op("receive", s.core_op(json_op(s.stateful_op(op_receive)))); + i.register_op("send", s.core_op(json_op(s.stateful_op(op_send)))); } #[derive(Debug, PartialEq)] @@ -137,6 +140,121 @@ fn op_accept( Ok(JsonOp::Async(op.boxed_local())) } +pub struct Receive<'a> { + state: &'a State, + rid: ResourceId, + buf: ZeroCopyBuf, +} + +impl Future for Receive<'_> { + type Output = Result<(usize, SocketAddr), ErrBox>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + let mut state = inner.state.borrow_mut(); + let resource = state + .resource_table + .get_mut::(inner.rid) + .ok_or_else(|| { + let e = std::io::Error::new( + std::io::ErrorKind::Other, + "Socket has been closed", + ); + ErrBox::from(e) + })?; + + let socket = &mut resource.socket; + + socket + .poll_recv_from(cx, &mut inner.buf) + .map_err(ErrBox::from) + } +} + +#[derive(Deserialize)] +struct ReceiveArgs { + rid: i32, +} + +fn receive(state: &State, rid: ResourceId, buf: ZeroCopyBuf) -> Receive { + Receive { state, rid, buf } +} + +fn op_receive( + state: &State, + args: Value, + zero_copy: Option, +) -> Result { + assert!(zero_copy.is_some()); + let buf = zero_copy.unwrap(); + + let args: ReceiveArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + + let state_ = state.clone(); + + let op = async move { + let (size, remote_addr) = receive(&state_, rid, buf).await?; + + Ok(json!({ + "size": size, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": "udp", + } + })) + }; + + Ok(JsonOp::Async(op.boxed_local())) +} + +#[derive(Deserialize)] +struct SendArgs { + rid: i32, + hostname: String, + port: u16, + transport: String, +} + +fn op_send( + state: &State, + args: Value, + zero_copy: Option, +) -> Result { + assert!(zero_copy.is_some()); + let buf = zero_copy.unwrap(); + + let args: SendArgs = serde_json::from_value(args)?; + assert_eq!(args.transport, "udp"); + let rid = args.rid as u32; + + let state_ = state.clone(); + state.check_net(&args.hostname, args.port)?; + + let op = async move { + let mut state = state_.borrow_mut(); + let resource = state + .resource_table + .get_mut::(rid) + .ok_or_else(|| { + let e = std::io::Error::new( + std::io::ErrorKind::Other, + "Socket has been closed", + ); + ErrBox::from(e) + })?; + + let socket = &mut resource.socket; + let addr = resolve_addr(&args.hostname, args.port).await?; + socket.send_to(&buf, addr).await?; + + Ok(json!({})) + }; + + Ok(JsonOp::Async(op.boxed_local())) +} + #[derive(Deserialize)] struct ConnectArgs { transport: String, @@ -278,18 +396,15 @@ impl TcpListenerResource { } } -fn op_listen( +struct UdpSocketResource { + socket: UdpSocket, +} + +fn listen_tcp( state: &State, - args: Value, - _zero_copy: Option, -) -> Result { - let args: ListenArgs = serde_json::from_value(args)?; - assert_eq!(args.transport, "tcp"); - - state.check_net(&args.hostname, args.port)?; - - let addr = - futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; + addr: SocketAddr, +) -> Result<(u32, SocketAddr), ErrBox> { + let mut state = state.borrow_mut(); let listener = futures::executor::block_on(TcpListener::bind(&addr))?; let local_addr = listener.local_addr()?; let listener_resource = TcpListenerResource { @@ -297,10 +412,47 @@ fn op_listen( waker: None, local_addr, }; - let mut state = state.borrow_mut(); let rid = state .resource_table .add("tcpListener", Box::new(listener_resource)); + + Ok((rid, local_addr)) +} + +fn listen_udp( + state: &State, + addr: SocketAddr, +) -> Result<(u32, SocketAddr), ErrBox> { + let mut state = state.borrow_mut(); + let socket = futures::executor::block_on(UdpSocket::bind(&addr))?; + let local_addr = socket.local_addr()?; + let socket_resource = UdpSocketResource { socket }; + let rid = state + .resource_table + .add("udpSocket", Box::new(socket_resource)); + + Ok((rid, local_addr)) +} + +fn op_listen( + state: &State, + args: Value, + _zero_copy: Option, +) -> Result { + let args: ListenArgs = serde_json::from_value(args)?; + assert!(args.transport == "tcp" || args.transport == "udp"); + + state.check_net(&args.hostname, args.port)?; + + let addr = + futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; + + let (rid, local_addr) = if args.transport == "tcp" { + listen_tcp(state, addr)? + } else { + listen_udp(state, addr)? + }; + debug!( "New listener {} {}:{}", rid, diff --git a/tools/http_benchmark.py b/tools/http_benchmark.py index 05cd542e3c..64abbf8baa 100755 --- a/tools/http_benchmark.py +++ b/tools/http_benchmark.py @@ -138,7 +138,7 @@ def http_benchmark(build_dir): return { # "deno_tcp" was once called "deno" "deno_tcp": deno_tcp(deno_exe), - # "deno_http" was once called "deno_net_http" + # "deno_udp": deno_udp(deno_exe), "deno_http": deno_http(deno_exe), "deno_proxy": deno_http_proxy(deno_exe, hyper_hello_exe), "deno_proxy_tcp": deno_tcp_proxy(deno_exe, hyper_hello_exe),