mirror of
https://github.com/denoland/deno.git
synced 2024-12-23 15:49:44 -05:00
perf(ext/http): Add a sync phase to http serving (#19321)
Under heavy load, we often have requests queued up that don't need an async call to retrieve. We can use a fast path sync op to drain this set of ready requests, and then fall back to the async op once we run out of work. This is a .5-1% bump in req/s on an M2 mac. About 90% of the handlers go through this sync phase (based on a simple instrumentation that is not included in this PR) and skip the async machinery entirely.
This commit is contained in:
parent
e1391e2054
commit
fae10bf3ae
3 changed files with 40 additions and 1 deletions
|
@ -64,6 +64,7 @@ const {
|
|||
op_http_set_response_trailers,
|
||||
op_http_upgrade_raw,
|
||||
op_http_upgrade_websocket_next,
|
||||
op_http_try_wait,
|
||||
op_http_wait,
|
||||
} = core.generateAsyncOpHandler(
|
||||
"op_http_get_request_headers",
|
||||
|
@ -80,6 +81,7 @@ const {
|
|||
"op_http_set_response_trailers",
|
||||
"op_http_upgrade_raw",
|
||||
"op_http_upgrade_websocket_next",
|
||||
"op_http_try_wait",
|
||||
"op_http_wait",
|
||||
);
|
||||
const _upgraded = Symbol("_upgraded");
|
||||
|
@ -558,7 +560,7 @@ function mapToCallback(responseBodies, context, signal, callback, onError) {
|
|||
}
|
||||
}
|
||||
|
||||
// Attempt to response quickly to this request, otherwise extract the stream
|
||||
// Attempt to respond quickly to this request, otherwise extract the stream
|
||||
const stream = fastSyncResponseOrStream(req, inner.body);
|
||||
if (stream !== null) {
|
||||
// Handle the stream asynchronously
|
||||
|
@ -671,6 +673,18 @@ function serve(arg1, arg2) {
|
|||
const rid = context.serverRid;
|
||||
let req;
|
||||
try {
|
||||
// Attempt to pull as many requests out of the queue as possible before awaiting. This API is
|
||||
// a synchronous, non-blocking API that returns u32::MAX if anything goes wrong.
|
||||
while ((req = op_http_try_wait(rid)) !== 0xffffffff) {
|
||||
PromisePrototypeCatch(callback(req), (error) => {
|
||||
// Abnormal exit
|
||||
console.error(
|
||||
"Terminating Deno.serve loop due to unexpected error",
|
||||
error,
|
||||
);
|
||||
context.close();
|
||||
});
|
||||
}
|
||||
currentPromise = op_http_wait(rid);
|
||||
if (!ref) {
|
||||
core.unrefOp(currentPromise[promiseIdSymbol]);
|
||||
|
|
|
@ -810,6 +810,30 @@ where
|
|||
))
|
||||
}
|
||||
|
||||
/// Synchronous, non-blocking call to see if there are any further HTTP requests. If anything
|
||||
/// goes wrong in this method we return [`SlabId::MAX`] and let the async handler pick up the real error.
|
||||
#[op(fast)]
|
||||
pub fn op_http_try_wait(state: &mut OpState, rid: ResourceId) -> SlabId {
|
||||
// The resource needs to exist.
|
||||
let Ok(join_handle) = state
|
||||
.resource_table
|
||||
.get::<HttpJoinHandle>(rid) else {
|
||||
return SlabId::MAX;
|
||||
};
|
||||
|
||||
// If join handle is somehow locked, just abort.
|
||||
let Some(mut handle) = RcRef::map(&join_handle, |this| &this.2).try_borrow_mut() else {
|
||||
return SlabId::MAX;
|
||||
};
|
||||
|
||||
// See if there are any requests waiting on this channel. If not, return.
|
||||
let Ok(id) = handle.try_recv() else {
|
||||
return SlabId::MAX;
|
||||
};
|
||||
|
||||
id
|
||||
}
|
||||
|
||||
#[op]
|
||||
pub async fn op_http_wait(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
|
|
|
@ -120,6 +120,7 @@ deno_core::extension!(
|
|||
http_next::op_http_track,
|
||||
http_next::op_http_upgrade_websocket_next,
|
||||
http_next::op_http_upgrade_raw,
|
||||
http_next::op_http_try_wait,
|
||||
http_next::op_http_wait,
|
||||
],
|
||||
esm = ["00_serve.js", "01_http.js"],
|
||||
|
|
Loading…
Reference in a new issue