1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

feat(ext/http): Implement request.signal for Deno.serve (#23425)

When the response has been successfully send, we abort the
`Request.signal` property to indicate that all resources associated with
this transaction may be torn down.
This commit is contained in:
Matt Mastracci 2024-04-24 14:03:37 -04:00 committed by GitHub
parent b60822f6e0
commit eed2598e6c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 119 additions and 34 deletions

View file

@ -10,6 +10,17 @@
declare namespace Deno {
export {}; // stop default export type behavior
/** Information for a HTTP request.
*
* @category HTTP Server
*/
export interface ServeHandlerInfo {
/** The remote address of the connection. */
remoteAddr: Deno.NetAddr;
/** The completion promise */
completed: Promise<void>;
}
/** **UNSTABLE**: New API, yet to be vetted.
*
* Retrieve the process umask. If `mask` is provided, sets the process umask.

View file

@ -4,6 +4,7 @@ import { core, internals, primordials } from "ext:core/mod.js";
const {
BadResourcePrototype,
InterruptedPrototype,
Interrupted,
internalRidSymbol,
} = core;
import {
@ -37,6 +38,7 @@ const {
TypeError,
TypedArrayPrototypeGetSymbolToStringTag,
Uint8Array,
Promise,
} = primordials;
import { InnerBody } from "ext:deno_fetch/22_body.js";
@ -132,14 +134,31 @@ class InnerRequest {
#body;
#upgraded;
#urlValue;
#completed;
#abortController;
constructor(external, context) {
constructor(external, context, abortController) {
this.#external = external;
this.#context = context;
this.#upgraded = false;
this.#completed = undefined;
this.#abortController = abortController;
}
close() {
close(success = true) {
// The completion signal fires only if someone cares
if (this.#completed) {
if (success) {
this.#completed.resolve(undefined);
} else {
this.#completed.reject(
new Interrupted("HTTP response was not sent successfully"),
);
}
}
// Unconditionally abort the request signal. Note that we don't use
// an error here.
this.#abortController.abort();
this.#external = null;
}
@ -271,6 +290,19 @@ class InnerRequest {
path;
}
get completed() {
if (!this.#completed) {
// NOTE: this is faster than Promise.withResolvers()
let resolve, reject;
const promise = new Promise((r1, r2) => {
resolve = r1;
reject = r2;
});
this.#completed = { promise, resolve, reject };
}
return this.#completed.promise;
}
get remoteAddr() {
const transport = this.#context.listener?.addr.transport;
if (transport === "unix" || transport === "unixpacket") {
@ -375,16 +407,24 @@ class CallbackContext {
}
class ServeHandlerInfo {
#inner = null;
constructor(inner) {
#inner: InnerRequest;
constructor(inner: InnerRequest) {
this.#inner = inner;
}
get remoteAddr() {
return this.#inner.remoteAddr;
}
get completed() {
return this.#inner.completed;
}
}
function fastSyncResponseOrStream(req, respBody, status, innerRequest) {
function fastSyncResponseOrStream(
req,
respBody,
status,
innerRequest: InnerRequest,
) {
if (respBody === null || respBody === undefined) {
// Don't set the body
innerRequest?.close();
@ -428,8 +468,8 @@ function fastSyncResponseOrStream(req, respBody, status, innerRequest) {
autoClose,
status,
),
() => {
innerRequest?.close();
(success) => {
innerRequest?.close(success);
op_http_close_after_finish(req);
},
);
@ -443,15 +483,16 @@ function fastSyncResponseOrStream(req, respBody, status, innerRequest) {
* This function returns a promise that will only reject in the case of abnormal exit.
*/
function mapToCallback(context, callback, onError) {
const signal = context.abortController.signal;
return async function (req) {
const abortController = new AbortController();
const signal = abortController.signal;
// Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
// 500 error.
let innerRequest;
let response;
try {
innerRequest = new InnerRequest(req, context);
innerRequest = new InnerRequest(req, context, abortController);
response = await callback(
fromInnerRequest(innerRequest, signal, "immutable"),
new ServeHandlerInfo(innerRequest),
@ -509,9 +550,27 @@ function mapToCallback(context, callback, onError) {
};
}
type RawHandler = (
request: Request,
info: ServeHandlerInfo,
) => Response | Promise<Response>;
type RawServeOptions = {
port?: number;
hostname?: string;
signal?: AbortSignal;
reusePort?: boolean;
key?: string;
cert?: string;
onError?: (error: unknown) => Response | Promise<Response>;
onListen?: (params: { hostname: string; port: number }) => void;
handler?: RawHandler;
};
function serve(arg1, arg2) {
let options = undefined;
let handler = undefined;
let options: RawServeOptions | undefined;
let handler: RawHandler | undefined;
if (typeof arg1 === "function") {
handler = arg1;
} else if (typeof arg2 === "function") {

View file

@ -683,7 +683,7 @@ pub async fn op_http_set_response_body_resource(
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
) -> Result<(), AnyError> {
) -> Result<bool, AnyError> {
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_body_resource") };
@ -716,8 +716,7 @@ pub async fn op_http_set_response_body_resource(
},
);
http.response_body_finished().await;
Ok(())
Ok(http.response_body_finished().await)
}
#[op2(fast)]

View file

@ -131,7 +131,7 @@ deno_core::extension!(
http_next::op_http_close,
http_next::op_http_cancel,
],
esm = ["00_serve.js", "01_http.js", "02_websocket.ts"],
esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
);
pub enum HttpSocketAddr {

View file

@ -16,7 +16,6 @@ use deno_core::Resource;
use flate2::write::GzEncoder;
use hyper::body::Frame;
use hyper::body::SizeHint;
use hyper::header::HeaderMap;
use pin_project::pin_project;
/// Simplification for nested types we use for our streams. We provide a way to convert from
@ -30,10 +29,6 @@ pub enum ResponseStreamResult {
/// not register a waker and should be called again at the lowest level of this code. Generally this
/// will only be returned from compression streams that require additional buffering.
NoData,
/// Stream provided trailers.
// TODO(mmastrac): We are threading trailers through the response system to eventually support Grpc.
#[allow(unused)]
Trailers(HeaderMap),
/// Stream failed.
Error(AnyError),
}
@ -44,7 +39,6 @@ impl From<ResponseStreamResult> for Option<Result<Frame<BufView>, AnyError>> {
ResponseStreamResult::EndOfStream => None,
ResponseStreamResult::NonEmptyBuf(buf) => Some(Ok(Frame::data(buf))),
ResponseStreamResult::Error(err) => Some(Err(err)),
ResponseStreamResult::Trailers(map) => Some(Ok(Frame::trailers(map))),
// This result should be handled by retrying
ResponseStreamResult::NoData => unimplemented!(),
}
@ -198,6 +192,11 @@ impl ResponseBytesInner {
_ => Self::Bytes(BufView::from(vec)),
}
}
/// Did we complete this response successfully?
pub fn is_complete(&self) -> bool {
matches!(self, ResponseBytesInner::Done | ResponseBytesInner::Empty)
}
}
pub struct ResourceBodyAdapter {
@ -387,9 +386,7 @@ impl PollFrame for GZipResponseStream {
let start_out = stm.total_out();
let res = match frame {
// Short-circuit these and just return
x @ (ResponseStreamResult::NoData
| ResponseStreamResult::Error(..)
| ResponseStreamResult::Trailers(..)) => {
x @ (ResponseStreamResult::NoData | ResponseStreamResult::Error(..)) => {
return std::task::Poll::Ready(x)
}
ResponseStreamResult::EndOfStream => {

View file

@ -482,12 +482,13 @@ impl HttpRecord {
HttpRecordReady(self)
}
/// Resolves when response body has finished streaming.
pub fn response_body_finished(&self) -> impl Future<Output = ()> + '_ {
/// Resolves when response body has finished streaming. Returns true if the
/// response completed.
pub fn response_body_finished(&self) -> impl Future<Output = bool> + '_ {
struct HttpRecordFinished<'a>(&'a HttpRecord);
impl<'a> Future for HttpRecordFinished<'a> {
type Output = ();
type Output = bool;
fn poll(
self: Pin<&mut Self>,
@ -495,7 +496,10 @@ impl HttpRecord {
) -> Poll<Self::Output> {
let mut mut_self = self.0.self_mut();
if mut_self.response_body_finished {
return Poll::Ready(());
// If we sent the response body and the trailers, this body completed successfully
return Poll::Ready(
mut_self.response_body.is_complete() && mut_self.trailers.is_none(),
);
}
mut_self.response_body_waker = Some(cx.waker().clone());
Poll::Pending

View file

@ -59,7 +59,7 @@ import {
ERR_UNESCAPED_CHARACTERS,
} from "ext:deno_node/internal/errors.ts";
import { getTimerDuration } from "ext:deno_node/internal/timers.mjs";
import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.js";
import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.ts";
import { createHttpClient } from "ext:deno_fetch/22_http_client.js";
import { headersEntries } from "ext:deno_fetch/20_headers.js";
import { timerId } from "ext:deno_web/03_abort_signal.js";

View file

@ -36,7 +36,7 @@ import {
} from "ext:deno_node/internal/stream_base_commons.ts";
import { FileHandle } from "node:fs/promises";
import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts";
import { serveHttpOnConnection } from "ext:deno_http/00_serve.js";
import { serveHttpOnConnection } from "ext:deno_http/00_serve.ts";
import { nextTick } from "ext:deno_node/_next_tick.ts";
import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
import { Duplex } from "node:stream";

View file

@ -13,7 +13,7 @@ import * as console from "ext:deno_console/01_console.js";
import * as ffi from "ext:deno_ffi/00_ffi.js";
import * as net from "ext:deno_net/01_net.js";
import * as tls from "ext:deno_net/02_tls.js";
import * as serve from "ext:deno_http/00_serve.js";
import * as serve from "ext:deno_http/00_serve.ts";
import * as http from "ext:deno_http/01_http.js";
import * as websocket from "ext:deno_http/02_websocket.ts";
import * as errors from "ext:runtime/01_errors.js";

View file

@ -2843,7 +2843,20 @@ Deno.test(
async function httpServerCancelFetch() {
const request2 = Promise.withResolvers<void>();
const request2Aborted = Promise.withResolvers<string>();
const { finished, abort } = await makeServer(async (req) => {
let completed = 0;
let aborted = 0;
const { finished, abort } = await makeServer(async (req, context) => {
context.completed.then(() => {
console.log("completed");
completed++;
}).catch(() => {
console.log("completed (error)");
completed++;
});
req.signal.onabort = () => {
console.log("aborted", req.url);
aborted++;
};
if (req.url.endsWith("/1")) {
const fetchRecursive = await fetch(`http://localhost:${servePort}/2`);
return new Response(fetchRecursive.body);
@ -2871,6 +2884,8 @@ Deno.test(
abort();
await finished;
assertEquals(completed, 2);
assertEquals(aborted, 2);
},
);

View file

@ -15,7 +15,7 @@
"ext:deno_fetch/26_fetch.js": "../ext/fetch/26_fetch.js",
"ext:deno_ffi/00_ffi.js": "../ext/ffi/00_ffi.js",
"ext:deno_fs/30_fs.js": "../ext/fs/30_fs.js",
"ext:deno_http/00_serve.js": "../ext/http/00_serve.js",
"ext:deno_http/00_serve.ts": "../ext/http/00_serve.ts",
"ext:deno_http/01_http.js": "../ext/http/01_http.js",
"ext:deno_io/12_io.js": "../ext/io/12_io.js",
"ext:deno_kv/01_db.ts": "../ext/kv/01_db.ts",