diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index 76656108b5..0c1ab8af09 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -10,6 +10,17 @@ declare namespace Deno { export {}; // stop default export type behavior + /** Information for a HTTP request. + * + * @category HTTP Server + */ + export interface ServeHandlerInfo { + /** The remote address of the connection. */ + remoteAddr: Deno.NetAddr; + /** The completion promise */ + completed: Promise; + } + /** **UNSTABLE**: New API, yet to be vetted. * * Retrieve the process umask. If `mask` is provided, sets the process umask. diff --git a/ext/http/00_serve.js b/ext/http/00_serve.ts similarity index 91% rename from ext/http/00_serve.js rename to ext/http/00_serve.ts index 52b833f101..1063f9691a 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.ts @@ -4,6 +4,7 @@ import { core, internals, primordials } from "ext:core/mod.js"; const { BadResourcePrototype, InterruptedPrototype, + Interrupted, internalRidSymbol, } = core; import { @@ -37,6 +38,7 @@ const { TypeError, TypedArrayPrototypeGetSymbolToStringTag, Uint8Array, + Promise, } = primordials; import { InnerBody } from "ext:deno_fetch/22_body.js"; @@ -132,14 +134,31 @@ class InnerRequest { #body; #upgraded; #urlValue; + #completed; + #abortController; - constructor(external, context) { + constructor(external, context, abortController) { this.#external = external; this.#context = context; this.#upgraded = false; + this.#completed = undefined; + this.#abortController = abortController; } - close() { + close(success = true) { + // The completion signal fires only if someone cares + if (this.#completed) { + if (success) { + this.#completed.resolve(undefined); + } else { + this.#completed.reject( + new Interrupted("HTTP response was not sent successfully"), + ); + } + } + // Unconditionally abort the request signal. Note that we don't use + // an error here. + this.#abortController.abort(); this.#external = null; } @@ -271,6 +290,19 @@ class InnerRequest { path; } + get completed() { + if (!this.#completed) { + // NOTE: this is faster than Promise.withResolvers() + let resolve, reject; + const promise = new Promise((r1, r2) => { + resolve = r1; + reject = r2; + }); + this.#completed = { promise, resolve, reject }; + } + return this.#completed.promise; + } + get remoteAddr() { const transport = this.#context.listener?.addr.transport; if (transport === "unix" || transport === "unixpacket") { @@ -375,16 +407,24 @@ class CallbackContext { } class ServeHandlerInfo { - #inner = null; - constructor(inner) { + #inner: InnerRequest; + constructor(inner: InnerRequest) { this.#inner = inner; } get remoteAddr() { return this.#inner.remoteAddr; } + get completed() { + return this.#inner.completed; + } } -function fastSyncResponseOrStream(req, respBody, status, innerRequest) { +function fastSyncResponseOrStream( + req, + respBody, + status, + innerRequest: InnerRequest, +) { if (respBody === null || respBody === undefined) { // Don't set the body innerRequest?.close(); @@ -428,8 +468,8 @@ function fastSyncResponseOrStream(req, respBody, status, innerRequest) { autoClose, status, ), - () => { - innerRequest?.close(); + (success) => { + innerRequest?.close(success); op_http_close_after_finish(req); }, ); @@ -443,15 +483,16 @@ function fastSyncResponseOrStream(req, respBody, status, innerRequest) { * This function returns a promise that will only reject in the case of abnormal exit. */ function mapToCallback(context, callback, onError) { - const signal = context.abortController.signal; - return async function (req) { + const abortController = new AbortController(); + const signal = abortController.signal; + // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback // 500 error. let innerRequest; let response; try { - innerRequest = new InnerRequest(req, context); + innerRequest = new InnerRequest(req, context, abortController); response = await callback( fromInnerRequest(innerRequest, signal, "immutable"), new ServeHandlerInfo(innerRequest), @@ -509,9 +550,27 @@ function mapToCallback(context, callback, onError) { }; } +type RawHandler = ( + request: Request, + info: ServeHandlerInfo, +) => Response | Promise; + +type RawServeOptions = { + port?: number; + hostname?: string; + signal?: AbortSignal; + reusePort?: boolean; + key?: string; + cert?: string; + onError?: (error: unknown) => Response | Promise; + onListen?: (params: { hostname: string; port: number }) => void; + handler?: RawHandler; +}; + function serve(arg1, arg2) { - let options = undefined; - let handler = undefined; + let options: RawServeOptions | undefined; + let handler: RawHandler | undefined; + if (typeof arg1 === "function") { handler = arg1; } else if (typeof arg2 === "function") { diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index a6527397f7..9bdb79f86b 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -683,7 +683,7 @@ pub async fn op_http_set_response_body_resource( #[smi] stream_rid: ResourceId, auto_close: bool, status: u16, -) -> Result<(), AnyError> { +) -> Result { let http = // SAFETY: op is called with external. unsafe { clone_external!(external, "op_http_set_response_body_resource") }; @@ -716,8 +716,7 @@ pub async fn op_http_set_response_body_resource( }, ); - http.response_body_finished().await; - Ok(()) + Ok(http.response_body_finished().await) } #[op2(fast)] diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 6fc7207bea..934f8a0024 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -131,7 +131,7 @@ deno_core::extension!( http_next::op_http_close, http_next::op_http_cancel, ], - esm = ["00_serve.js", "01_http.js", "02_websocket.ts"], + esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"], ); pub enum HttpSocketAddr { diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index dac708b96b..6b033ffe07 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -16,7 +16,6 @@ use deno_core::Resource; use flate2::write::GzEncoder; use hyper::body::Frame; use hyper::body::SizeHint; -use hyper::header::HeaderMap; use pin_project::pin_project; /// Simplification for nested types we use for our streams. We provide a way to convert from @@ -30,10 +29,6 @@ pub enum ResponseStreamResult { /// not register a waker and should be called again at the lowest level of this code. Generally this /// will only be returned from compression streams that require additional buffering. NoData, - /// Stream provided trailers. - // TODO(mmastrac): We are threading trailers through the response system to eventually support Grpc. - #[allow(unused)] - Trailers(HeaderMap), /// Stream failed. Error(AnyError), } @@ -44,7 +39,6 @@ impl From for Option, AnyError>> { ResponseStreamResult::EndOfStream => None, ResponseStreamResult::NonEmptyBuf(buf) => Some(Ok(Frame::data(buf))), ResponseStreamResult::Error(err) => Some(Err(err)), - ResponseStreamResult::Trailers(map) => Some(Ok(Frame::trailers(map))), // This result should be handled by retrying ResponseStreamResult::NoData => unimplemented!(), } @@ -198,6 +192,11 @@ impl ResponseBytesInner { _ => Self::Bytes(BufView::from(vec)), } } + + /// Did we complete this response successfully? + pub fn is_complete(&self) -> bool { + matches!(self, ResponseBytesInner::Done | ResponseBytesInner::Empty) + } } pub struct ResourceBodyAdapter { @@ -387,9 +386,7 @@ impl PollFrame for GZipResponseStream { let start_out = stm.total_out(); let res = match frame { // Short-circuit these and just return - x @ (ResponseStreamResult::NoData - | ResponseStreamResult::Error(..) - | ResponseStreamResult::Trailers(..)) => { + x @ (ResponseStreamResult::NoData | ResponseStreamResult::Error(..)) => { return std::task::Poll::Ready(x) } ResponseStreamResult::EndOfStream => { diff --git a/ext/http/service.rs b/ext/http/service.rs index 932575e37c..f38fec4f4f 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -482,12 +482,13 @@ impl HttpRecord { HttpRecordReady(self) } - /// Resolves when response body has finished streaming. - pub fn response_body_finished(&self) -> impl Future + '_ { + /// Resolves when response body has finished streaming. Returns true if the + /// response completed. + pub fn response_body_finished(&self) -> impl Future + '_ { struct HttpRecordFinished<'a>(&'a HttpRecord); impl<'a> Future for HttpRecordFinished<'a> { - type Output = (); + type Output = bool; fn poll( self: Pin<&mut Self>, @@ -495,7 +496,10 @@ impl HttpRecord { ) -> Poll { let mut mut_self = self.0.self_mut(); if mut_self.response_body_finished { - return Poll::Ready(()); + // If we sent the response body and the trailers, this body completed successfully + return Poll::Ready( + mut_self.response_body.is_complete() && mut_self.trailers.is_none(), + ); } mut_self.response_body_waker = Some(cx.waker().clone()); Poll::Pending diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index ceaf7aeb89..07ef66146a 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -59,7 +59,7 @@ import { ERR_UNESCAPED_CHARACTERS, } from "ext:deno_node/internal/errors.ts"; import { getTimerDuration } from "ext:deno_node/internal/timers.mjs"; -import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.js"; +import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.ts"; import { createHttpClient } from "ext:deno_fetch/22_http_client.js"; import { headersEntries } from "ext:deno_fetch/20_headers.js"; import { timerId } from "ext:deno_web/03_abort_signal.js"; diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts index 023b6acd3f..02e66e3da3 100644 --- a/ext/node/polyfills/http2.ts +++ b/ext/node/polyfills/http2.ts @@ -36,7 +36,7 @@ import { } from "ext:deno_node/internal/stream_base_commons.ts"; import { FileHandle } from "node:fs/promises"; import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts"; -import { serveHttpOnConnection } from "ext:deno_http/00_serve.js"; +import { serveHttpOnConnection } from "ext:deno_http/00_serve.ts"; import { nextTick } from "ext:deno_node/_next_tick.ts"; import { TextEncoder } from "ext:deno_web/08_text_encoding.js"; import { Duplex } from "node:stream"; diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index 96799cb090..02ac7b6020 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -13,7 +13,7 @@ import * as console from "ext:deno_console/01_console.js"; import * as ffi from "ext:deno_ffi/00_ffi.js"; import * as net from "ext:deno_net/01_net.js"; import * as tls from "ext:deno_net/02_tls.js"; -import * as serve from "ext:deno_http/00_serve.js"; +import * as serve from "ext:deno_http/00_serve.ts"; import * as http from "ext:deno_http/01_http.js"; import * as websocket from "ext:deno_http/02_websocket.ts"; import * as errors from "ext:runtime/01_errors.js"; diff --git a/tests/unit/serve_test.ts b/tests/unit/serve_test.ts index 048529ae96..32d05056a3 100644 --- a/tests/unit/serve_test.ts +++ b/tests/unit/serve_test.ts @@ -2843,7 +2843,20 @@ Deno.test( async function httpServerCancelFetch() { const request2 = Promise.withResolvers(); const request2Aborted = Promise.withResolvers(); - const { finished, abort } = await makeServer(async (req) => { + let completed = 0; + let aborted = 0; + const { finished, abort } = await makeServer(async (req, context) => { + context.completed.then(() => { + console.log("completed"); + completed++; + }).catch(() => { + console.log("completed (error)"); + completed++; + }); + req.signal.onabort = () => { + console.log("aborted", req.url); + aborted++; + }; if (req.url.endsWith("/1")) { const fetchRecursive = await fetch(`http://localhost:${servePort}/2`); return new Response(fetchRecursive.body); @@ -2871,6 +2884,8 @@ Deno.test( abort(); await finished; + assertEquals(completed, 2); + assertEquals(aborted, 2); }, ); diff --git a/tools/core_import_map.json b/tools/core_import_map.json index 463095de8a..421769e523 100644 --- a/tools/core_import_map.json +++ b/tools/core_import_map.json @@ -15,7 +15,7 @@ "ext:deno_fetch/26_fetch.js": "../ext/fetch/26_fetch.js", "ext:deno_ffi/00_ffi.js": "../ext/ffi/00_ffi.js", "ext:deno_fs/30_fs.js": "../ext/fs/30_fs.js", - "ext:deno_http/00_serve.js": "../ext/http/00_serve.js", + "ext:deno_http/00_serve.ts": "../ext/http/00_serve.ts", "ext:deno_http/01_http.js": "../ext/http/01_http.js", "ext:deno_io/12_io.js": "../ext/io/12_io.js", "ext:deno_kv/01_db.ts": "../ext/kv/01_db.ts",