1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-05 13:59:01 -05:00

Reland "perf(ext/flash): optimize response streaming" (#16660)

This commit is contained in:
Divy Srivastava 2022-11-22 02:53:58 -08:00 committed by Bartek Iwańczuk
parent cdb180fd66
commit 0f305c866b
No known key found for this signature in database
GPG key ID: 0C6BCDDC3B3AD750
2 changed files with 94 additions and 14 deletions

View file

@ -243,6 +243,7 @@
i,
respondFast,
respondChunked,
tryRespondChunked,
) {
// there might've been an HTTP upgrade.
if (resp === undefined) {
@ -371,6 +372,9 @@
}
} else {
const reader = respBody.getReader();
// Best case: sends headers + first chunk in a single go.
const { value, done } = await reader.read();
writeFixedResponse(
serverId,
i,
@ -385,14 +389,23 @@
false,
respondFast,
);
while (true) {
const { value, done } = await reader.read();
await respondChunked(
i,
value,
done,
);
if (done) break;
await tryRespondChunked(
i,
value,
done,
);
if (!done) {
while (true) {
const chunk = await reader.read();
await respondChunked(
i,
chunk.value,
chunk.done,
);
if (chunk.done) break;
}
}
}
}
@ -572,6 +585,7 @@
i,
respondFast,
respondChunked,
tryRespondChunked,
),
),
onError,
@ -589,6 +603,7 @@
i,
respondFast,
respondChunked,
tryRespondChunked,
)
).catch(onError);
continue;
@ -607,6 +622,7 @@
i,
respondFast,
respondChunked,
tryRespondChunked,
);
}
@ -623,6 +639,25 @@
once: true,
});
function tryRespondChunked(token, chunk, shutdown) {
const nwritten = core.ops.op_try_flash_respond_chuncked(
serverId,
token,
chunk ?? new Uint8Array(),
shutdown,
);
if (nwritten > 0) {
return core.opAsync(
"op_flash_respond_chuncked",
serverId,
token,
chunk,
shutdown,
nwritten,
);
}
}
function respondChunked(token, chunk, shutdown) {
return core.opAsync(
"op_flash_respond_chuncked",

View file

@ -106,6 +106,39 @@ fn op_flash_respond(
flash_respond(ctx, token, shutdown, &response)
}
#[op(fast)]
fn op_try_flash_respond_chuncked(
op_state: &mut OpState,
server_id: u32,
token: u32,
response: &[u8],
shutdown: bool,
) -> u32 {
let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
let tx = ctx.requests.get(&token).unwrap();
let sock = tx.socket();
// TODO(@littledivy): Use writev when `UnixIoSlice` lands.
// https://github.com/denoland/deno/pull/15629
let h = format!("{:x}\r\n", response.len());
let concat = [h.as_bytes(), response, b"\r\n"].concat();
let expected = sock.try_write(&concat);
if expected != concat.len() {
if expected > 2 {
return expected as u32;
}
return expected as u32;
}
if shutdown {
// Best case: We've written everything and the stream is done too.
let _ = ctx.requests.remove(&token).unwrap();
}
0
}
#[op]
async fn op_flash_respond_async(
state: Rc<RefCell<OpState>>,
@ -157,6 +190,7 @@ async fn op_flash_respond_chuncked(
token: u32,
response: Option<ZeroCopyBuf>,
shutdown: bool,
nwritten: u32,
) -> Result<(), AnyError> {
let mut op_state = op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
@ -178,17 +212,27 @@ async fn op_flash_respond_chuncked(
.with_async_stream(|stream| {
Box::pin(async move {
use tokio::io::AsyncWriteExt;
// TODO(@littledivy): Use writev when `UnixIoSlice` lands.
// https://github.com/denoland/deno/pull/15629
macro_rules! write_whats_not_written {
($e:expr) => {
let e = $e;
let n = nwritten as usize;
if n < e.len() {
stream.write_all(&e[n..]).await?;
}
};
}
if let Some(response) = response {
stream
.write_all(format!("{:x}\r\n", response.len()).as_bytes())
.await?;
stream.write_all(&response).await?;
stream.write_all(b"\r\n").await?;
let h = format!("{:x}\r\n", response.len());
write_whats_not_written!(h.as_bytes());
write_whats_not_written!(&response);
write_whats_not_written!(b"\r\n");
}
// The last chunk
if shutdown {
stream.write_all(b"0\r\n\r\n").await?;
write_whats_not_written!(b"0\r\n\r\n");
}
Ok(())
@ -1485,6 +1529,7 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
op_flash_close_server::decl(),
op_flash_make_request::decl(),
op_flash_write_resource::decl(),
op_try_flash_respond_chuncked::decl(),
])
.state(move |op_state| {
op_state.put(Unstable(unstable));