From fae10bf3aea4c0d01a0cb58596f5670b8c577a57 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Tue, 30 May 2023 18:02:52 -0600 Subject: [PATCH] perf(ext/http): Add a sync phase to http serving (#19321) Under heavy load, we often have requests queued up that don't need an async call to retrieve. We can use a fast path sync op to drain this set of ready requests, and then fall back to the async op once we run out of work. This is a .5-1% bump in req/s on an M2 mac. About 90% of the handlers go through this sync phase (based on a simple instrumentation that is not included in this PR) and skip the async machinery entirely. --- ext/http/00_serve.js | 16 +++++++++++++++- ext/http/http_next.rs | 24 ++++++++++++++++++++++++ ext/http/lib.rs | 1 + 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 2fb36d044e..e3926280b2 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -64,6 +64,7 @@ const { op_http_set_response_trailers, op_http_upgrade_raw, op_http_upgrade_websocket_next, + op_http_try_wait, op_http_wait, } = core.generateAsyncOpHandler( "op_http_get_request_headers", @@ -80,6 +81,7 @@ const { "op_http_set_response_trailers", "op_http_upgrade_raw", "op_http_upgrade_websocket_next", + "op_http_try_wait", "op_http_wait", ); const _upgraded = Symbol("_upgraded"); @@ -558,7 +560,7 @@ function mapToCallback(responseBodies, context, signal, callback, onError) { } } - // Attempt to response quickly to this request, otherwise extract the stream + // Attempt to respond quickly to this request, otherwise extract the stream const stream = fastSyncResponseOrStream(req, inner.body); if (stream !== null) { // Handle the stream asynchronously @@ -671,6 +673,18 @@ function serve(arg1, arg2) { const rid = context.serverRid; let req; try { + // Attempt to pull as many requests out of the queue as possible before awaiting. This API is + // a synchronous, non-blocking API that returns u32::MAX if anything goes wrong. + while ((req = op_http_try_wait(rid)) !== 0xffffffff) { + PromisePrototypeCatch(callback(req), (error) => { + // Abnormal exit + console.error( + "Terminating Deno.serve loop due to unexpected error", + error, + ); + context.close(); + }); + } currentPromise = op_http_wait(rid); if (!ref) { core.unrefOp(currentPromise[promiseIdSymbol]); diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index d479d4a912..7edffed655 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -810,6 +810,30 @@ where )) } +/// Synchronous, non-blocking call to see if there are any further HTTP requests. If anything +/// goes wrong in this method we return [`SlabId::MAX`] and let the async handler pick up the real error. +#[op(fast)] +pub fn op_http_try_wait(state: &mut OpState, rid: ResourceId) -> SlabId { + // The resource needs to exist. + let Ok(join_handle) = state + .resource_table + .get::(rid) else { + return SlabId::MAX; + }; + + // If join handle is somehow locked, just abort. + let Some(mut handle) = RcRef::map(&join_handle, |this| &this.2).try_borrow_mut() else { + return SlabId::MAX; + }; + + // See if there are any requests waiting on this channel. If not, return. + let Ok(id) = handle.try_recv() else { + return SlabId::MAX; + }; + + id +} + #[op] pub async fn op_http_wait( state: Rc>, diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 7d37c53e1c..da007ba398 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -120,6 +120,7 @@ deno_core::extension!( http_next::op_http_track, http_next::op_http_upgrade_websocket_next, http_next::op_http_upgrade_raw, + http_next::op_http_try_wait, http_next::op_http_wait, ], esm = ["00_serve.js", "01_http.js"],