From 65b647909d75a586c087af3e10f54338e689e81c Mon Sep 17 00:00:00 2001 From: snek Date: Fri, 20 Dec 2024 13:48:48 +0100 Subject: [PATCH] feat(unstable): Implement QUIC (#21942) Implements a QUIC interface, loosely based on the WebTransport API (a future change could add the WebTransport API, built on top of this one). [quinn](https://docs.rs/quinn/latest/quinn/) is used for the underlying QUIC implementation, for a few reasons: - A cloneable "handle" api which fits quite nicely into deno resources. - Good collaboration with the rust ecosystem, especially rustls. - I like it. --- Cargo.lock | 53 ++- cli/tsc/99_main_compiler.js | 7 + ext/net/03_quic.js | 367 ++++++++++++++++ ext/net/Cargo.toml | 1 + ext/net/lib.deno_net.d.ts | 288 +++++++++++++ ext/net/lib.rs | 29 +- ext/net/quic.rs | 660 +++++++++++++++++++++++++++++ ext/web/06_streams.js | 8 +- runtime/fmt_errors.rs | 14 + runtime/js/90_deno_ns.js | 10 + tests/integration/js_unit_tests.rs | 1 + tests/unit/quic_test.ts | 172 ++++++++ 12 files changed, 1591 insertions(+), 19 deletions(-) create mode 100644 ext/net/03_quic.js create mode 100644 ext/net/quic.rs create mode 100644 tests/unit/quic_test.ts diff --git a/Cargo.lock b/Cargo.lock index 965b7a4150..6cd68e5ce1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -728,6 +728,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.37" @@ -1918,6 +1924,7 @@ dependencies = [ "hickory-proto", "hickory-resolver", "pin-project", + "quinn", "rustls-tokio-stream", "serde", "socket2", @@ -4025,9 +4032,9 @@ dependencies = [ [[package]] name = "hyper-timeout" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ "hyper 1.4.1", "hyper-util", @@ -5908,49 +5915,54 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.2" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4ceeeeabace7857413798eb1ffa1e9c905a9946a57d81fb69b4b71c4d8eb3ad" +checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef" dependencies = [ "bytes", "pin-project-lite", "quinn-proto", "quinn-udp", - "rustc-hash 1.1.0", + "rustc-hash 2.0.0", "rustls", - "thiserror 1.0.64", + "socket2", + "thiserror 2.0.3", "tokio", "tracing", ] [[package]] name = "quinn-proto" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" +checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d" dependencies = [ "bytes", + "getrandom", "rand", "ring", "rustc-hash 2.0.0", "rustls", + "rustls-pki-types", "slab", - "thiserror 1.0.64", + "thiserror 2.0.3", "tinyvec", "tracing", + "web-time", ] [[package]] name = "quinn-udp" -version = "0.5.2" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46" +checksum = "52cd4b1eff68bf27940dd39811292c49e007f4d0b4c357358dc9b0197be6b527" dependencies = [ + "cfg_aliases 0.2.1", "libc", "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6427,6 +6439,9 @@ name = "rustls-pki-types" version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" +dependencies = [ + "web-time", +] [[package]] name = "rustls-tokio-stream" @@ -8523,6 +8538,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-root-certs" version = "0.26.6" @@ -8550,7 +8575,7 @@ dependencies = [ "arrayvec", "bit-vec", "bitflags 2.6.0", - "cfg_aliases", + "cfg_aliases 0.1.1", "codespan-reporting", "document-features", "indexmap 2.3.0", @@ -8582,7 +8607,7 @@ dependencies = [ "bit-set", "bitflags 2.6.0", "block", - "cfg_aliases", + "cfg_aliases 0.1.1", "core-graphics-types", "d3d12", "glow", diff --git a/cli/tsc/99_main_compiler.js b/cli/tsc/99_main_compiler.js index 7e8a407cf9..f7862c95e4 100644 --- a/cli/tsc/99_main_compiler.js +++ b/cli/tsc/99_main_compiler.js @@ -41,6 +41,13 @@ delete Object.prototype.__proto__; "listen", "listenDatagram", "openKv", + "connectQuic", + "listenQuic", + "QuicBidirectionalStream", + "QuicConn", + "QuicListener", + "QuicReceiveStream", + "QuicSendStream", ]); const unstableMsgSuggestion = "If not, try changing the 'lib' compiler option to include 'deno.unstable' " + diff --git a/ext/net/03_quic.js b/ext/net/03_quic.js new file mode 100644 index 0000000000..e100e7bd64 --- /dev/null +++ b/ext/net/03_quic.js @@ -0,0 +1,367 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. +import { core, primordials } from "ext:core/mod.js"; +import { + op_quic_accept, + op_quic_accept_bi, + op_quic_accept_incoming, + op_quic_accept_uni, + op_quic_close_connection, + op_quic_close_endpoint, + op_quic_connect, + op_quic_connection_closed, + op_quic_connection_get_protocol, + op_quic_connection_get_remote_addr, + op_quic_endpoint_get_addr, + op_quic_get_send_stream_priority, + op_quic_incoming_accept, + op_quic_incoming_ignore, + op_quic_incoming_local_ip, + op_quic_incoming_refuse, + op_quic_incoming_remote_addr, + op_quic_incoming_remote_addr_validated, + op_quic_listen, + op_quic_max_datagram_size, + op_quic_open_bi, + op_quic_open_uni, + op_quic_read_datagram, + op_quic_send_datagram, + op_quic_set_send_stream_priority, +} from "ext:core/ops"; +import { + getWritableStreamResourceBacking, + ReadableStream, + readableStreamForRid, + WritableStream, + writableStreamForRid, +} from "ext:deno_web/06_streams.js"; +import { loadTlsKeyPair } from "ext:deno_net/02_tls.js"; +const { + BadResourcePrototype, +} = core; +const { + Uint8Array, + TypedArrayPrototypeSubarray, + SymbolAsyncIterator, + SafePromisePrototypeFinally, + ObjectPrototypeIsPrototypeOf, +} = primordials; + +class QuicSendStream extends WritableStream { + get sendOrder() { + return op_quic_get_send_stream_priority( + getWritableStreamResourceBacking(this).rid, + ); + } + + set sendOrder(p) { + op_quic_set_send_stream_priority( + getWritableStreamResourceBacking(this).rid, + p, + ); + } +} + +class QuicReceiveStream extends ReadableStream {} + +function readableStream(rid, closed) { + // stream can be indirectly closed by closing connection. + SafePromisePrototypeFinally(closed, () => { + core.tryClose(rid); + }); + return readableStreamForRid(rid, true, QuicReceiveStream); +} + +function writableStream(rid, closed) { + // stream can be indirectly closed by closing connection. + SafePromisePrototypeFinally(closed, () => { + core.tryClose(rid); + }); + return writableStreamForRid(rid, true, QuicSendStream); +} + +class QuicBidirectionalStream { + #readable; + #writable; + + constructor(txRid, rxRid, closed) { + this.#readable = readableStream(rxRid, closed); + this.#writable = writableStream(txRid, closed); + } + + get readable() { + return this.#readable; + } + + get writable() { + return this.#writable; + } +} + +async function* bidiStream(conn, closed) { + try { + while (true) { + const r = await op_quic_accept_bi(conn); + yield new QuicBidirectionalStream(r[0], r[1], closed); + } + } catch (error) { + if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) { + return; + } + throw error; + } +} + +async function* uniStream(conn, closed) { + try { + while (true) { + const uniRid = await op_quic_accept_uni(conn); + yield readableStream(uniRid, closed); + } + } catch (error) { + if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) { + return; + } + throw error; + } +} + +class QuicConn { + #resource; + #bidiStream = null; + #uniStream = null; + #closed; + + constructor(resource) { + this.#resource = resource; + + this.#closed = op_quic_connection_closed(this.#resource); + core.unrefOpPromise(this.#closed); + } + + get protocol() { + return op_quic_connection_get_protocol(this.#resource); + } + + get remoteAddr() { + return op_quic_connection_get_remote_addr(this.#resource); + } + + async createBidirectionalStream( + { sendOrder, waitUntilAvailable } = { __proto__: null }, + ) { + const { 0: txRid, 1: rxRid } = await op_quic_open_bi( + this.#resource, + waitUntilAvailable ?? false, + ); + if (sendOrder !== null && sendOrder !== undefined) { + op_quic_set_send_stream_priority(txRid, sendOrder); + } + return new QuicBidirectionalStream(txRid, rxRid, this.#closed); + } + + async createUnidirectionalStream( + { sendOrder, waitUntilAvailable } = { __proto__: null }, + ) { + const rid = await op_quic_open_uni( + this.#resource, + waitUntilAvailable ?? false, + ); + if (sendOrder !== null && sendOrder !== undefined) { + op_quic_set_send_stream_priority(rid, sendOrder); + } + return writableStream(rid, this.#closed); + } + + get incomingBidirectionalStreams() { + if (this.#bidiStream === null) { + this.#bidiStream = ReadableStream.from( + bidiStream(this.#resource, this.#closed), + ); + } + return this.#bidiStream; + } + + get incomingUnidirectionalStreams() { + if (this.#uniStream === null) { + this.#uniStream = ReadableStream.from( + uniStream(this.#resource, this.#closed), + ); + } + return this.#uniStream; + } + + get maxDatagramSize() { + return op_quic_max_datagram_size(this.#resource); + } + + async readDatagram(p) { + const view = p || new Uint8Array(this.maxDatagramSize); + const nread = await op_quic_read_datagram(this.#resource, view); + return TypedArrayPrototypeSubarray(view, 0, nread); + } + + async sendDatagram(data) { + await op_quic_send_datagram(this.#resource, data); + } + + get closed() { + core.refOpPromise(this.#closed); + return this.#closed; + } + + close({ closeCode, reason }) { + op_quic_close_connection(this.#resource, closeCode, reason); + } +} + +class QuicIncoming { + #incoming; + + constructor(incoming) { + this.#incoming = incoming; + } + + get localIp() { + return op_quic_incoming_local_ip(this.#incoming); + } + + get remoteAddr() { + return op_quic_incoming_remote_addr(this.#incoming); + } + + get remoteAddressValidated() { + return op_quic_incoming_remote_addr_validated(this.#incoming); + } + + async accept() { + const conn = await op_quic_incoming_accept(this.#incoming); + return new QuicConn(conn); + } + + refuse() { + op_quic_incoming_refuse(this.#incoming); + } + + ignore() { + op_quic_incoming_ignore(this.#incoming); + } +} + +class QuicListener { + #endpoint; + + constructor(endpoint) { + this.#endpoint = endpoint; + } + + get addr() { + return op_quic_endpoint_get_addr(this.#endpoint); + } + + async accept() { + const conn = await op_quic_accept(this.#endpoint); + return new QuicConn(conn); + } + + async incoming() { + const incoming = await op_quic_accept_incoming(this.#endpoint); + return new QuicIncoming(incoming); + } + + async next() { + let conn; + try { + conn = await this.accept(); + } catch (error) { + if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) { + return { value: undefined, done: true }; + } + throw error; + } + return { value: conn, done: false }; + } + + [SymbolAsyncIterator]() { + return this; + } + + close({ closeCode, reason }) { + op_quic_close_endpoint(this.#endpoint, closeCode, reason); + } +} + +async function listenQuic( + { + hostname, + port, + cert, + key, + alpnProtocols, + keepAliveInterval, + maxIdleTimeout, + maxConcurrentBidirectionalStreams, + maxConcurrentUnidirectionalStreams, + }, +) { + hostname = hostname || "0.0.0.0"; + const keyPair = loadTlsKeyPair("Deno.listenQuic", { cert, key }); + const endpoint = await op_quic_listen( + { hostname, port }, + { alpnProtocols }, + { + keepAliveInterval, + maxIdleTimeout, + maxConcurrentBidirectionalStreams, + maxConcurrentUnidirectionalStreams, + }, + keyPair, + ); + return new QuicListener(endpoint); +} + +async function connectQuic( + { + hostname, + port, + serverName, + caCerts, + cert, + key, + alpnProtocols, + keepAliveInterval, + maxIdleTimeout, + maxConcurrentBidirectionalStreams, + maxConcurrentUnidirectionalStreams, + congestionControl, + }, +) { + const keyPair = loadTlsKeyPair("Deno.connectQuic", { cert, key }); + const conn = await op_quic_connect( + { hostname, port }, + { + caCerts, + alpnProtocols, + serverName, + }, + { + keepAliveInterval, + maxIdleTimeout, + maxConcurrentBidirectionalStreams, + maxConcurrentUnidirectionalStreams, + congestionControl, + }, + keyPair, + ); + return new QuicConn(conn); +} + +export { + connectQuic, + listenQuic, + QuicBidirectionalStream, + QuicConn, + QuicIncoming, + QuicListener, + QuicReceiveStream, + QuicSendStream, +}; diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml index 8dbb0be391..eaee7bfb4b 100644 --- a/ext/net/Cargo.toml +++ b/ext/net/Cargo.toml @@ -20,6 +20,7 @@ deno_tls.workspace = true hickory-proto = "0.25.0-alpha.4" hickory-resolver.workspace = true pin-project.workspace = true +quinn = { version = "0.11.6", default-features = false, features = ["runtime-tokio", "rustls", "ring"] } rustls-tokio-stream.workspace = true serde.workspace = true socket2.workspace = true diff --git a/ext/net/lib.deno_net.d.ts b/ext/net/lib.deno_net.d.ts index 827081f2a4..958474cbbd 100644 --- a/ext/net/lib.deno_net.d.ts +++ b/ext/net/lib.deno_net.d.ts @@ -450,5 +450,293 @@ declare namespace Deno { options?: StartTlsOptions, ): Promise; + /** + * **UNSTABLE**: New API, yet to be vetted. + * @experimental + * @category Network + */ + export interface QuicTransportOptions { + /** Period of inactivity before sending a keep-alive packet. Keep-alive + * packets prevent an inactive but otherwise healthy connection from timing + * out. Only one side of any given connection needs keep-alive enabled for + * the connection to be preserved. + * @default {undefined} + */ + keepAliveInterval?: number; + /** Maximum duration of inactivity to accept before timing out the + * connection. The true idle timeout is the minimum of this and the peer’s + * own max idle timeout. + * @default {undefined} + */ + maxIdleTimeout?: number; + /** Maximum number of incoming bidirectional streams that may be open + * concurrently. + * @default {100} + */ + maxConcurrentBidirectionalStreams?: number; + /** Maximum number of incoming unidirectional streams that may be open + * concurrently. + * @default {100} + */ + maxConcurrentUnidirectionalStreams?: number; + } + + /** + * **UNSTABLE**: New API, yet to be vetted. + * @experimental + * @category Network + */ + export interface ListenQuicOptions extends QuicTransportOptions { + /** The port to connect to. */ + port: number; + /** + * A literal IP address or host name that can be resolved to an IP address. + * @default {"0.0.0.0"} + */ + hostname?: string; + /** Server private key in PEM format */ + key: string; + /** Cert chain in PEM format */ + cert: string; + /** Application-Layer Protocol Negotiation (ALPN) protocols to announce to + * the client. QUIC requires the use of ALPN. + */ + alpnProtocols: string[]; + } + + /** + * **UNSTABLE**: New API, yet to be vetted. + * Listen announces on the local transport address over QUIC. + * + * ```ts + * const lstnr = await Deno.listenQuic({ port: 443, cert: "...", key: "...", alpnProtocols: ["h3"] }); + * ``` + * + * Requires `allow-net` permission. + * + * @experimental + * @tags allow-net + * @category Network + */ + export function listenQuic(options: ListenQuicOptions): Promise; + + /** + * **UNSTABLE**: New API, yet to be vetted. + * @experimental + * @category Network + */ + export interface ConnectQuicOptions extends QuicTransportOptions { + /** The port to connect to. */ + port: number; + /** A literal IP address or host name that can be resolved to an IP address. */ + hostname: string; + /** The name used for validating the certificate provided by the server. If + * not provided, defaults to `hostname`. */ + serverName?: string | undefined; + /** Application-Layer Protocol Negotiation (ALPN) protocols supported by + * the client. QUIC requires the use of ALPN. + */ + alpnProtocols: string[]; + /** A list of root certificates that will be used in addition to the + * default root certificates to verify the peer's certificate. + * + * Must be in PEM format. */ + caCerts?: string[]; + /** + * The congestion control algorithm used when sending data over this connection. + */ + congestionControl?: "throughput" | "low-latency"; + } + + /** + * **UNSTABLE**: New API, yet to be vetted. + * Establishes a secure connection over QUIC using a hostname and port. The + * cert file is optional and if not included Mozilla's root certificates will + * be used. See also https://github.com/ctz/webpki-roots for specifics. + * + * ```ts + * const caCert = await Deno.readTextFile("./certs/my_custom_root_CA.pem"); + * const conn1 = await Deno.connectQuic({ hostname: "example.com", port: 443, alpnProtocols: ["h3"] }); + * const conn2 = await Deno.connectQuic({ caCerts: [caCert], hostname: "example.com", port: 443, alpnProtocols: ["h3"] }); + * ``` + * + * Requires `allow-net` permission. + * + * @experimental + * @tags allow-net + * @category Network + */ + export function connectQuic(options: ConnectQuicOptions): Promise; + + /** + * **UNSTABLE**: New API, yet to be vetted. + * @experimental + * @category Network + */ + export interface QuicCloseInfo { + /** A number representing the error code for the error. */ + closeCode: number; + /** A string representing the reason for closing the connection. */ + reason: string; + } + + /** + * **UNSTABLE**: New API, yet to be vetted. + * An incoming connection for which the server has not yet begun its part of the handshake. + * + * @experimental + * @category Network + */ + export interface QuicIncoming { + /** + * The local IP address which was used when the peer established the connection. + */ + readonly localIp: string; + + /** + * The peer’s UDP address. + */ + readonly remoteAddr: NetAddr; + + /** + * Whether the socket address that is initiating this connection has proven that they can receive traffic. + */ + readonly remoteAddressValidated: boolean; + + /** + * Accept this incoming connection. + */ + accept(): Promise; + + /** + * Refuse this incoming connection. + */ + refuse(): void; + + /** + * Ignore this incoming connection attempt, not sending any packet in response. + */ + ignore(): void; + } + + /** + * **UNSTABLE**: New API, yet to be vetted. + * Specialized listener that accepts QUIC connections. + * + * @experimental + * @category Network + */ + export interface QuicListener extends AsyncIterable { + /** Return the address of the `QuicListener`. */ + readonly addr: NetAddr; + + /** Waits for and resolves to the next connection to the `QuicListener`. */ + accept(): Promise; + + /** Waits for and resolves to the next incoming request to the `QuicListener`. */ + incoming(): Promise; + + /** Close closes the listener. Any pending accept promises will be rejected + * with errors. */ + close(info: QuicCloseInfo): void; + + [Symbol.asyncIterator](): AsyncIterableIterator; + } + + /** + * **UNSTABLE**: New API, yet to be vetted. + * + * @experimental + * @category Network + */ + export interface QuicSendStreamOptions { + /** Indicates the send priority of this stream relative to other streams for + * which the value has been set. + * @default {undefined} + */ + sendOrder?: number; + /** Wait until there is sufficient flow credit to create the stream. + * @default {false} + */ + waitUntilAvailable?: boolean; + } + + /** + * **UNSTABLE**: New API, yet to be vetted. + * + * @experimental + * @category Network + */ + export interface QuicConn { + /** Close closes the listener. Any pending accept promises will be rejected + * with errors. */ + close(info: QuicCloseInfo): void; + /** Opens and returns a bidirectional stream. */ + createBidirectionalStream( + options?: QuicSendStreamOptions, + ): Promise; + /** Opens and returns a unidirectional stream. */ + createUnidirectionalStream( + options?: QuicSendStreamOptions, + ): Promise; + /** Send a datagram. The provided data cannot be larger than + * `maxDatagramSize`. */ + sendDatagram(data: Uint8Array): Promise; + /** Receive a datagram. If no buffer is provider, one will be allocated. + * The size of the provided buffer should be at least `maxDatagramSize`. */ + readDatagram(buffer?: Uint8Array): Promise; + + /** Return the remote address for the connection. Clients may change + * addresses at will, for example when switching to a cellular internet + * connection. + */ + readonly remoteAddr: NetAddr; + /** The negotiated ALPN protocol, if provided. */ + readonly protocol: string | undefined; + /** Returns a promise that resolves when the connection is closed. */ + readonly closed: Promise; + /** A stream of bidirectional streams opened by the peer. */ + readonly incomingBidirectionalStreams: ReadableStream< + QuicBidirectionalStream + >; + /** A stream of unidirectional streams opened by the peer. */ + readonly incomingUnidirectionalStreams: ReadableStream; + /** Returns the datagram stream for sending and receiving datagrams. */ + readonly maxDatagramSize: number; + } + + /** + * **UNSTABLE**: New API, yet to be vetted. + * + * @experimental + * @category Network + */ + export interface QuicBidirectionalStream { + /** Returns a QuicReceiveStream instance that can be used to read incoming data. */ + readonly readable: QuicReceiveStream; + /** Returns a QuicSendStream instance that can be used to write outgoing data. */ + readonly writable: QuicSendStream; + } + + /** + * **UNSTABLE**: New API, yet to be vetted. + * + * @experimental + * @category Network + */ + export interface QuicSendStream extends WritableStream { + /** Indicates the send priority of this stream relative to other streams for + * which the value has been set. */ + sendOrder: number; + } + + /** + * **UNSTABLE**: New API, yet to be vetted. + * + * @experimental + * @category Network + */ + export interface QuicReceiveStream extends ReadableStream {} + export {}; // only export exports } diff --git a/ext/net/lib.rs b/ext/net/lib.rs index f482750b38..04b3f80010 100644 --- a/ext/net/lib.rs +++ b/ext/net/lib.rs @@ -5,6 +5,7 @@ pub mod ops; pub mod ops_tls; #[cfg(unix)] pub mod ops_unix; +mod quic; pub mod raw; pub mod resolve_addr; pub mod tcp; @@ -158,8 +159,34 @@ deno_core::extension!(deno_net, ops_unix::op_node_unstable_net_listen_unixpacket

, ops_unix::op_net_recv_unixpacket, ops_unix::op_net_send_unixpacket

, + + quic::op_quic_accept, + quic::op_quic_accept_bi, + quic::op_quic_accept_incoming, + quic::op_quic_accept_uni, + quic::op_quic_close_connection, + quic::op_quic_close_endpoint, + quic::op_quic_connection_closed, + quic::op_quic_connection_get_protocol, + quic::op_quic_connection_get_remote_addr, + quic::op_quic_connect

, + quic::op_quic_endpoint_get_addr, + quic::op_quic_get_send_stream_priority, + quic::op_quic_incoming_accept, + quic::op_quic_incoming_refuse, + quic::op_quic_incoming_ignore, + quic::op_quic_incoming_local_ip, + quic::op_quic_incoming_remote_addr, + quic::op_quic_incoming_remote_addr_validated, + quic::op_quic_listen

, + quic::op_quic_max_datagram_size, + quic::op_quic_open_bi, + quic::op_quic_open_uni, + quic::op_quic_read_datagram, + quic::op_quic_send_datagram, + quic::op_quic_set_send_stream_priority, ], - esm = [ "01_net.js", "02_tls.js" ], + esm = [ "01_net.js", "02_tls.js", "03_quic.js" ], options = { root_cert_store_provider: Option>, unsafely_ignore_certificate_errors: Option>, diff --git a/ext/net/quic.rs b/ext/net/quic.rs new file mode 100644 index 0000000000..16f68364be --- /dev/null +++ b/ext/net/quic.rs @@ -0,0 +1,660 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use crate::resolve_addr::resolve_addr; +use crate::DefaultTlsOptions; +use crate::NetPermissions; +use crate::UnsafelyIgnoreCertificateErrors; +use deno_core::error::bad_resource; +use deno_core::error::generic_error; +use deno_core::error::AnyError; +use deno_core::futures::task::noop_waker_ref; +use deno_core::op2; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufView; +use deno_core::GarbageCollected; +use deno_core::JsBuffer; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::WriteOutcome; +use deno_tls::create_client_config; +use deno_tls::SocketUse; +use deno_tls::TlsKeys; +use deno_tls::TlsKeysHolder; +use quinn::crypto::rustls::QuicClientConfig; +use quinn::crypto::rustls::QuicServerConfig; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::future::Future; +use std::net::IpAddr; +use std::net::Ipv4Addr; +use std::net::Ipv6Addr; +use std::net::SocketAddrV4; +use std::net::SocketAddrV6; +use std::pin::pin; +use std::rc::Rc; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; + +#[derive(Debug, Deserialize, Serialize)] +struct Addr { + hostname: String, + port: u16, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ListenArgs { + alpn_protocols: Option>, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct TransportConfig { + keep_alive_interval: Option, + max_idle_timeout: Option, + max_concurrent_bidirectional_streams: Option, + max_concurrent_unidirectional_streams: Option, + preferred_address_v4: Option, + preferred_address_v6: Option, + congestion_control: Option, +} + +impl TryInto for TransportConfig { + type Error = AnyError; + + fn try_into(self) -> Result { + let mut cfg = quinn::TransportConfig::default(); + + if let Some(interval) = self.keep_alive_interval { + cfg.keep_alive_interval(Some(Duration::from_millis(interval))); + } + + if let Some(timeout) = self.max_idle_timeout { + cfg.max_idle_timeout(Some(Duration::from_millis(timeout).try_into()?)); + } + + if let Some(max) = self.max_concurrent_bidirectional_streams { + cfg.max_concurrent_bidi_streams(max.into()); + } + + if let Some(max) = self.max_concurrent_unidirectional_streams { + cfg.max_concurrent_uni_streams(max.into()); + } + + if let Some(v) = self.congestion_control { + let controller: Option< + Arc, + > = match v.as_str() { + "low-latency" => { + Some(Arc::new(quinn::congestion::BbrConfig::default())) + } + "throughput" => { + Some(Arc::new(quinn::congestion::CubicConfig::default())) + } + _ => None, + }; + if let Some(controller) = controller { + cfg.congestion_controller_factory(controller); + } + } + + Ok(cfg) + } +} + +struct EndpointResource(quinn::Endpoint, Arc); + +impl GarbageCollected for EndpointResource {} + +#[op2(async)] +#[cppgc] +pub(crate) async fn op_quic_listen( + state: Rc>, + #[serde] addr: Addr, + #[serde] args: ListenArgs, + #[serde] transport_config: TransportConfig, + #[cppgc] keys: &TlsKeysHolder, +) -> Result +where + NP: NetPermissions + 'static, +{ + state + .borrow_mut() + .borrow_mut::() + .check_net(&(&addr.hostname, Some(addr.port)), "Deno.listenQuic()")?; + + let addr = resolve_addr(&addr.hostname, addr.port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + + let TlsKeys::Static(deno_tls::TlsKey(cert, key)) = keys.take() else { + unreachable!() + }; + + let mut crypto = + quinn::rustls::ServerConfig::builder_with_protocol_versions(&[ + &quinn::rustls::version::TLS13, + ]) + .with_no_client_auth() + .with_single_cert(cert.clone(), key.clone_key())?; + + if let Some(alpn_protocols) = args.alpn_protocols { + crypto.alpn_protocols = alpn_protocols + .into_iter() + .map(|alpn| alpn.into_bytes()) + .collect(); + } + + let server_config = Arc::new(QuicServerConfig::try_from(crypto)?); + let mut config = quinn::ServerConfig::with_crypto(server_config.clone()); + config.preferred_address_v4(transport_config.preferred_address_v4); + config.preferred_address_v6(transport_config.preferred_address_v6); + config.transport_config(Arc::new(transport_config.try_into()?)); + let endpoint = quinn::Endpoint::server(config, addr)?; + + Ok(EndpointResource(endpoint, server_config)) +} + +#[op2] +#[serde] +pub(crate) fn op_quic_endpoint_get_addr( + #[cppgc] endpoint: &EndpointResource, +) -> Result { + let addr = endpoint.0.local_addr()?; + let addr = Addr { + hostname: format!("{}", addr.ip()), + port: addr.port(), + }; + Ok(addr) +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct CloseInfo { + close_code: u64, + reason: String, +} + +#[op2(fast)] +pub(crate) fn op_quic_close_endpoint( + #[cppgc] endpoint: &EndpointResource, + #[bigint] close_code: u64, + #[string] reason: String, +) -> Result<(), AnyError> { + endpoint + .0 + .close(quinn::VarInt::from_u64(close_code)?, reason.as_bytes()); + Ok(()) +} + +struct ConnectionResource(quinn::Connection); + +impl GarbageCollected for ConnectionResource {} + +#[op2(async)] +#[cppgc] +pub(crate) async fn op_quic_accept( + #[cppgc] endpoint: &EndpointResource, +) -> Result { + match endpoint.0.accept().await { + Some(incoming) => { + let conn = incoming.accept()?.await?; + Ok(ConnectionResource(conn)) + } + None => Err(bad_resource("QuicListener is closed")), + } +} + +struct IncomingResource( + RefCell>, + Arc, +); + +impl GarbageCollected for IncomingResource {} + +#[op2(async)] +#[cppgc] +pub(crate) async fn op_quic_accept_incoming( + #[cppgc] endpoint: &EndpointResource, +) -> Result { + match endpoint.0.accept().await { + Some(incoming) => Ok(IncomingResource( + RefCell::new(Some(incoming)), + endpoint.1.clone(), + )), + None => Err(bad_resource("QuicListener is closed")), + } +} + +#[op2] +#[string] +pub(crate) fn op_quic_incoming_local_ip( + #[cppgc] incoming_resource: &IncomingResource, +) -> Result, AnyError> { + let Some(incoming) = incoming_resource.0.borrow_mut().take() else { + return Err(bad_resource("QuicIncoming already used")); + }; + Ok(incoming.local_ip().map(|ip| ip.to_string())) +} + +#[op2] +#[serde] +pub(crate) fn op_quic_incoming_remote_addr( + #[cppgc] incoming_resource: &IncomingResource, +) -> Result { + let Some(incoming) = incoming_resource.0.borrow_mut().take() else { + return Err(bad_resource("QuicIncoming already used")); + }; + let addr = incoming.remote_address(); + Ok(Addr { + hostname: format!("{}", addr.ip()), + port: addr.port(), + }) +} + +#[op2(fast)] +pub(crate) fn op_quic_incoming_remote_addr_validated( + #[cppgc] incoming_resource: &IncomingResource, +) -> Result { + let Some(incoming) = incoming_resource.0.borrow_mut().take() else { + return Err(bad_resource("QuicIncoming already used")); + }; + Ok(incoming.remote_address_validated()) +} + +#[op2(async)] +#[cppgc] +pub(crate) async fn op_quic_incoming_accept( + #[cppgc] incoming_resource: &IncomingResource, + #[serde] transport_config: Option, +) -> Result { + let Some(incoming) = incoming_resource.0.borrow_mut().take() else { + return Err(bad_resource("QuicIncoming already used")); + }; + let conn = match transport_config { + Some(transport_config) => { + let mut config = + quinn::ServerConfig::with_crypto(incoming_resource.1.clone()); + config.preferred_address_v4(transport_config.preferred_address_v4); + config.preferred_address_v6(transport_config.preferred_address_v6); + config.transport_config(Arc::new(transport_config.try_into()?)); + incoming.accept_with(Arc::new(config))?.await? + } + None => incoming.accept()?.await?, + }; + Ok(ConnectionResource(conn)) +} + +#[op2] +#[serde] +pub(crate) fn op_quic_incoming_refuse( + #[cppgc] incoming: &IncomingResource, +) -> Result<(), AnyError> { + let Some(incoming) = incoming.0.borrow_mut().take() else { + return Err(bad_resource("QuicIncoming already used")); + }; + incoming.refuse(); + Ok(()) +} + +#[op2] +#[serde] +pub(crate) fn op_quic_incoming_ignore( + #[cppgc] incoming: &IncomingResource, +) -> Result<(), AnyError> { + let Some(incoming) = incoming.0.borrow_mut().take() else { + return Err(bad_resource("QuicIncoming already used")); + }; + incoming.ignore(); + Ok(()) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ConnectArgs { + ca_certs: Option>, + alpn_protocols: Option>, + server_name: Option, +} + +#[op2(async)] +#[cppgc] +pub(crate) async fn op_quic_connect( + state: Rc>, + #[serde] addr: Addr, + #[serde] args: ConnectArgs, + #[serde] transport_config: TransportConfig, + #[cppgc] key_pair: &TlsKeysHolder, +) -> Result +where + NP: NetPermissions + 'static, +{ + state + .borrow_mut() + .borrow_mut::() + .check_net(&(&addr.hostname, Some(addr.port)), "Deno.connectQuic()")?; + + let sock_addr = resolve_addr(&addr.hostname, addr.port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + + let root_cert_store = state + .borrow() + .borrow::() + .root_cert_store()?; + + let unsafely_ignore_certificate_errors = state + .borrow() + .try_borrow::() + .and_then(|it| it.0.clone()); + + let ca_certs = args + .ca_certs + .unwrap_or_default() + .into_iter() + .map(|s| s.into_bytes()) + .collect::>(); + + let mut tls_config = create_client_config( + root_cert_store, + ca_certs, + unsafely_ignore_certificate_errors, + key_pair.take(), + SocketUse::GeneralSsl, + )?; + + if let Some(alpn_protocols) = args.alpn_protocols { + tls_config.alpn_protocols = + alpn_protocols.into_iter().map(|s| s.into_bytes()).collect(); + } + + let client_config = QuicClientConfig::try_from(tls_config)?; + let mut client_config = quinn::ClientConfig::new(Arc::new(client_config)); + client_config.transport_config(Arc::new(transport_config.try_into()?)); + + let local_addr = match sock_addr.ip() { + IpAddr::V4(_) => IpAddr::from(Ipv4Addr::new(0, 0, 0, 0)), + IpAddr::V6(_) => IpAddr::from(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), + }; + + let conn = quinn::Endpoint::client((local_addr, 0).into())? + .connect_with( + client_config, + sock_addr, + &args.server_name.unwrap_or(addr.hostname), + )? + .await?; + + Ok(ConnectionResource(conn)) +} + +#[op2] +#[string] +pub(crate) fn op_quic_connection_get_protocol( + #[cppgc] connection: &ConnectionResource, +) -> Option { + connection + .0 + .handshake_data() + .and_then(|h| h.downcast::().ok()) + .and_then(|h| h.protocol) + .map(|p| String::from_utf8_lossy(&p).into_owned()) +} + +#[op2] +#[serde] +pub(crate) fn op_quic_connection_get_remote_addr( + #[cppgc] connection: &ConnectionResource, +) -> Result { + let addr = connection.0.remote_address(); + Ok(Addr { + hostname: format!("{}", addr.ip()), + port: addr.port(), + }) +} + +#[op2(fast)] +pub(crate) fn op_quic_close_connection( + #[cppgc] connection: &ConnectionResource, + #[bigint] close_code: u64, + #[string] reason: String, +) -> Result<(), AnyError> { + connection + .0 + .close(quinn::VarInt::from_u64(close_code)?, reason.as_bytes()); + Ok(()) +} + +#[op2(async)] +#[serde] +pub(crate) async fn op_quic_connection_closed( + #[cppgc] connection: &ConnectionResource, +) -> Result { + let e = connection.0.closed().await; + match e { + quinn::ConnectionError::LocallyClosed => Ok(CloseInfo { + close_code: 0, + reason: "".into(), + }), + quinn::ConnectionError::ApplicationClosed(i) => Ok(CloseInfo { + close_code: i.error_code.into(), + reason: String::from_utf8_lossy(&i.reason).into_owned(), + }), + e => Err(e.into()), + } +} + +struct SendStreamResource(AsyncRefCell); + +impl SendStreamResource { + fn new(stream: quinn::SendStream) -> Self { + Self(AsyncRefCell::new(stream)) + } +} + +impl Resource for SendStreamResource { + fn name(&self) -> Cow { + "quicSendStream".into() + } + + fn write(self: Rc, view: BufView) -> AsyncResult { + Box::pin(async move { + let mut r = RcRef::map(self, |r| &r.0).borrow_mut().await; + let nwritten = r.write(&view).await?; + Ok(WriteOutcome::Partial { nwritten, view }) + }) + } +} + +struct RecvStreamResource(AsyncRefCell); + +impl RecvStreamResource { + fn new(stream: quinn::RecvStream) -> Self { + Self(AsyncRefCell::new(stream)) + } +} + +impl Resource for RecvStreamResource { + fn name(&self) -> Cow { + "quicReceiveStream".into() + } + + fn read(self: Rc, limit: usize) -> AsyncResult { + Box::pin(async move { + let mut r = RcRef::map(self, |r| &r.0).borrow_mut().await; + let mut data = vec![0; limit]; + let nread = r.read(&mut data).await?.unwrap_or(0); + data.truncate(nread); + Ok(BufView::from(data)) + }) + } +} + +#[op2(async)] +#[serde] +pub(crate) async fn op_quic_accept_bi( + #[cppgc] connection: &ConnectionResource, + state: Rc>, +) -> Result<(ResourceId, ResourceId), AnyError> { + match connection.0.accept_bi().await { + Ok((tx, rx)) => { + let mut state = state.borrow_mut(); + let tx_rid = state.resource_table.add(SendStreamResource::new(tx)); + let rx_rid = state.resource_table.add(RecvStreamResource::new(rx)); + Ok((tx_rid, rx_rid)) + } + Err(e) => match e { + quinn::ConnectionError::LocallyClosed + | quinn::ConnectionError::ApplicationClosed(..) => { + Err(bad_resource("QuicConn is closed")) + } + _ => Err(e.into()), + }, + } +} + +#[op2(async)] +#[serde] +pub(crate) async fn op_quic_open_bi( + #[cppgc] connection: &ConnectionResource, + state: Rc>, + wait_for_available: bool, +) -> Result<(ResourceId, ResourceId), AnyError> { + let (tx, rx) = if wait_for_available { + connection.0.open_bi().await? + } else { + let waker = noop_waker_ref(); + let mut cx = Context::from_waker(waker); + match pin!(connection.0.open_bi()).poll(&mut cx) { + Poll::Ready(r) => r?, + Poll::Pending => { + return Err(generic_error("Connection has reached the maximum number of outgoing concurrent bidirectional streams")); + } + } + }; + let mut state = state.borrow_mut(); + let tx_rid = state.resource_table.add(SendStreamResource::new(tx)); + let rx_rid = state.resource_table.add(RecvStreamResource::new(rx)); + Ok((tx_rid, rx_rid)) +} + +#[op2(async)] +#[serde] +pub(crate) async fn op_quic_accept_uni( + #[cppgc] connection: &ConnectionResource, + state: Rc>, +) -> Result { + match connection.0.accept_uni().await { + Ok(rx) => { + let rid = state + .borrow_mut() + .resource_table + .add(RecvStreamResource::new(rx)); + Ok(rid) + } + Err(e) => match e { + quinn::ConnectionError::LocallyClosed + | quinn::ConnectionError::ApplicationClosed(..) => { + Err(bad_resource("QuicConn is closed")) + } + _ => Err(e.into()), + }, + } +} + +#[op2(async)] +#[serde] +pub(crate) async fn op_quic_open_uni( + #[cppgc] connection: &ConnectionResource, + state: Rc>, + wait_for_available: bool, +) -> Result { + let tx = if wait_for_available { + connection.0.open_uni().await? + } else { + let waker = noop_waker_ref(); + let mut cx = Context::from_waker(waker); + match pin!(connection.0.open_uni()).poll(&mut cx) { + Poll::Ready(r) => r?, + Poll::Pending => { + return Err(generic_error("Connection has reached the maximum number of outgoing concurrent unidirectional streams")); + } + } + }; + let rid = state + .borrow_mut() + .resource_table + .add(SendStreamResource::new(tx)); + Ok(rid) +} + +#[op2(async)] +pub(crate) async fn op_quic_send_datagram( + #[cppgc] connection: &ConnectionResource, + #[buffer] buf: JsBuffer, +) -> Result<(), AnyError> { + connection.0.send_datagram_wait(buf.to_vec().into()).await?; + Ok(()) +} + +#[op2(async)] +pub(crate) async fn op_quic_read_datagram( + #[cppgc] connection: &ConnectionResource, + #[buffer] mut buf: JsBuffer, +) -> Result { + let data = connection.0.read_datagram().await?; + buf[0..data.len()].copy_from_slice(&data); + Ok(data.len() as _) +} + +#[op2(fast)] +pub(crate) fn op_quic_max_datagram_size( + #[cppgc] connection: &ConnectionResource, +) -> Result { + Ok(connection.0.max_datagram_size().unwrap_or(0) as _) +} + +#[op2(fast)] +pub(crate) fn op_quic_get_send_stream_priority( + state: Rc>, + #[smi] rid: ResourceId, +) -> Result { + let resource = state + .borrow() + .resource_table + .get::(rid)?; + let r = RcRef::map(resource, |r| &r.0).try_borrow(); + match r { + Some(s) => Ok(s.priority()?), + None => Err(generic_error("Unable to get priority")), + } +} + +#[op2(fast)] +pub(crate) fn op_quic_set_send_stream_priority( + state: Rc>, + #[smi] rid: ResourceId, + priority: i32, +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get::(rid)?; + let r = RcRef::map(resource, |r| &r.0).try_borrow(); + match r { + Some(s) => { + s.set_priority(priority)?; + Ok(()) + } + None => Err(generic_error("Unable to set priority")), + } +} diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index e673ee2bb4..f3ac711fc7 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -908,8 +908,8 @@ const _original = Symbol("[[original]]"); * @param {boolean=} autoClose If the resource should be auto-closed when the stream closes. Defaults to true. * @returns {ReadableStream} */ -function readableStreamForRid(rid, autoClose = true) { - const stream = new ReadableStream(_brand); +function readableStreamForRid(rid, autoClose = true, Super) { + const stream = new (Super ?? ReadableStream)(_brand); stream[_resourceBacking] = { rid, autoClose }; const tryClose = () => { @@ -1130,8 +1130,8 @@ async function readableStreamCollectIntoUint8Array(stream) { * @param {boolean=} autoClose If the resource should be auto-closed when the stream closes. Defaults to true. * @returns {ReadableStream} */ -function writableStreamForRid(rid, autoClose = true) { - const stream = new WritableStream(_brand); +function writableStreamForRid(rid, autoClose = true, Super) { + const stream = new (Super ?? WritableStream)(_brand); stream[_resourceBacking] = { rid, autoClose }; const tryClose = () => { diff --git a/runtime/fmt_errors.rs b/runtime/fmt_errors.rs index 6f120b5d46..3c60c3a3d7 100644 --- a/runtime/fmt_errors.rs +++ b/runtime/fmt_errors.rs @@ -422,6 +422,20 @@ fn get_suggestions_for_terminal_errors(e: &JsError) -> Vec { "Run again with `--unstable-webgpu` flag to enable this API.", ), ]; + } else if msg.contains("listenQuic is not a function") { + return vec![ + FixSuggestion::info("listenQuic is an unstable API."), + FixSuggestion::hint( + "Run again with `--unstable-net` flag to enable this API.", + ), + ]; + } else if msg.contains("connectQuic is not a function") { + return vec![ + FixSuggestion::info("connectQuic is an unstable API."), + FixSuggestion::hint( + "Run again with `--unstable-net` flag to enable this API.", + ), + ]; // Try to capture errors like: // ``` // Uncaught Error: Cannot find module '../build/Release/canvas.node' diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index a510ee33c4..5511649279 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -13,6 +13,7 @@ import * as console from "ext:deno_console/01_console.js"; import * as ffi from "ext:deno_ffi/00_ffi.js"; import * as net from "ext:deno_net/01_net.js"; import * as tls from "ext:deno_net/02_tls.js"; +import * as quic from "ext:deno_net/03_quic.js"; import * as serve from "ext:deno_http/00_serve.ts"; import * as http from "ext:deno_http/01_http.js"; import * as websocket from "ext:deno_http/02_websocket.ts"; @@ -174,6 +175,15 @@ denoNsUnstableById[unstableIds.net] = { op_net_listen_udp, op_net_listen_unixpacket, ), + + connectQuic: quic.connectQuic, + listenQuic: quic.listenQuic, + QuicBidirectionalStream: quic.QuicBidirectionalStream, + QuicConn: quic.QuicConn, + QuicListener: quic.QuicListener, + QuicReceiveStream: quic.QuicReceiveStream, + QuicSendStream: quic.QuicSendStream, + QuicIncoming: quic.QuicIncoming, }; // denoNsUnstableById[unstableIds.unsafeProto] = { __proto__: null } diff --git a/tests/integration/js_unit_tests.rs b/tests/integration/js_unit_tests.rs index 577ca043ca..717a8d8e7c 100644 --- a/tests/integration/js_unit_tests.rs +++ b/tests/integration/js_unit_tests.rs @@ -66,6 +66,7 @@ util::unit_test_factory!( process_test, progressevent_test, promise_hooks_test, + quic_test, read_dir_test, read_file_test, read_link_test, diff --git a/tests/unit/quic_test.ts b/tests/unit/quic_test.ts new file mode 100644 index 0000000000..f5423327de --- /dev/null +++ b/tests/unit/quic_test.ts @@ -0,0 +1,172 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +import { assertEquals } from "./test_util.ts"; + +const cert = Deno.readTextFileSync("tests/testdata/tls/localhost.crt"); +const key = Deno.readTextFileSync("tests/testdata/tls/localhost.key"); +const caCerts = [Deno.readTextFileSync("tests/testdata/tls/RootCA.pem")]; + +async function pair(opt?: Deno.QuicTransportOptions): Promise< + [Deno.QuicConn, Deno.QuicConn, Deno.QuicListener] +> { + const listener = await Deno.listenQuic({ + hostname: "localhost", + port: 0, + cert, + key, + alpnProtocols: ["deno-test"], + ...opt, + }); + + const [server, client] = await Promise.all([ + listener.accept(), + Deno.connectQuic({ + hostname: "localhost", + port: listener.addr.port, + caCerts, + alpnProtocols: ["deno-test"], + ...opt, + }), + ]); + + assertEquals(server.protocol, "deno-test"); + assertEquals(client.protocol, "deno-test"); + assertEquals(client.remoteAddr, listener.addr); + + return [server, client, listener]; +} + +Deno.test("bidirectional stream", async () => { + const [server, client, listener] = await pair(); + + const encoded = (new TextEncoder()).encode("hi!"); + + { + const bi = await server.createBidirectionalStream({ sendOrder: 42 }); + assertEquals(bi.writable.sendOrder, 42); + bi.writable.sendOrder = 0; + assertEquals(bi.writable.sendOrder, 0); + await bi.writable.getWriter().write(encoded); + } + + { + const { value: bi } = await client.incomingBidirectionalStreams + .getReader() + .read(); + const { value: data } = await bi!.readable.getReader().read(); + assertEquals(data, encoded); + } + + listener.close({ closeCode: 0, reason: "" }); + client.close({ closeCode: 0, reason: "" }); +}); + +Deno.test("unidirectional stream", async () => { + const [server, client, listener] = await pair(); + + const encoded = (new TextEncoder()).encode("hi!"); + + { + const uni = await server.createUnidirectionalStream({ sendOrder: 42 }); + assertEquals(uni.sendOrder, 42); + uni.sendOrder = 0; + assertEquals(uni.sendOrder, 0); + await uni.getWriter().write(encoded); + } + + { + const { value: uni } = await client.incomingUnidirectionalStreams + .getReader() + .read(); + const { value: data } = await uni!.getReader().read(); + assertEquals(data, encoded); + } + + listener.close({ closeCode: 0, reason: "" }); + client.close({ closeCode: 0, reason: "" }); +}); + +Deno.test("datagrams", async () => { + const [server, client, listener] = await pair(); + + const encoded = (new TextEncoder()).encode("hi!"); + + await server.sendDatagram(encoded); + + const data = await client.readDatagram(); + assertEquals(data, encoded); + + listener.close({ closeCode: 0, reason: "" }); + client.close({ closeCode: 0, reason: "" }); +}); + +Deno.test("closing", async () => { + const [server, client, listener] = await pair(); + + server.close({ closeCode: 42, reason: "hi!" }); + + assertEquals(await client.closed, { closeCode: 42, reason: "hi!" }); + + listener.close({ closeCode: 0, reason: "" }); +}); + +Deno.test("max concurrent streams", async () => { + const [server, client, listener] = await pair({ + maxConcurrentBidirectionalStreams: 1, + maxConcurrentUnidirectionalStreams: 1, + }); + + { + await server.createBidirectionalStream(); + await server.createBidirectionalStream() + .then(() => { + throw new Error("expected failure"); + }, () => { + // success! + }); + } + + { + await server.createUnidirectionalStream(); + await server.createUnidirectionalStream() + .then(() => { + throw new Error("expected failure"); + }, () => { + // success! + }); + } + + listener.close({ closeCode: 0, reason: "" }); + server.close({ closeCode: 0, reason: "" }); + client.close({ closeCode: 0, reason: "" }); +}); + +Deno.test("incoming", async () => { + const listener = await Deno.listenQuic({ + hostname: "localhost", + port: 0, + cert, + key, + alpnProtocols: ["deno-test"], + }); + + const connect = () => + Deno.connectQuic({ + hostname: "localhost", + port: listener.addr.port, + caCerts, + alpnProtocols: ["deno-test"], + }); + + const c1p = connect(); + const i1 = await listener.incoming(); + const server = await i1.accept(); + const client = await c1p; + + assertEquals(server.protocol, "deno-test"); + assertEquals(client.protocol, "deno-test"); + assertEquals(client.remoteAddr, listener.addr); + + listener.close({ closeCode: 0, reason: "" }); + client.close({ closeCode: 0, reason: "" }); +});