0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-10-29 08:58:01 -04:00

perf(ext/flash): optimize path response streams (#16284)

Regression caused by https://github.com/denoland/deno/pull/15591
This commit is contained in:
Divy Srivastava 2022-10-19 16:41:47 +05:30 committed by GitHub
parent 36307c45e9
commit 743fcc0668
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 17 deletions

View file

@ -365,6 +365,8 @@
}
} else {
const reader = respBody.getReader();
const { value, done } = await reader.read();
// Best case: sends headers + first chunk in a single go.
writeFixedResponse(
serverId,
i,
@ -379,14 +381,21 @@
false,
respondFast,
);
while (true) {
const { value, done } = await reader.read();
await respondChunked(
i,
value,
done,
);
if (done) break;
await respondChunked(
i,
value,
done,
);
if (!done) {
while (true) {
const chunk = await reader.read();
await respondChunked(
i,
chunk.value,
chunk.done,
);
if (chunk.done) break;
}
}
}
}
@ -591,13 +600,22 @@
});
function respondChunked(token, chunk, shutdown) {
return core.opAsync(
"op_flash_respond_chuncked",
const nwritten = core.ops.op_try_flash_respond_chuncked(
serverId,
token,
chunk,
chunk ?? new Uint8Array(),
shutdown,
);
if (nwritten > 0) {
return core.opAsync(
"op_flash_respond_chuncked",
serverId,
token,
chunk,
shutdown,
nwritten,
);
}
}
const fastOp = prepareFastCalls();

View file

@ -150,6 +150,34 @@ async fn op_flash_respond_async(
Ok(())
}
#[op(fast)]
fn op_try_flash_respond_chuncked(
op_state: &mut OpState,
server_id: u32,
token: u32,
response: &[u8],
shutdown: bool,
) -> u32 {
let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
let tx = ctx.requests.get(&token).unwrap();
let sock = tx.socket();
// TODO(@littledivy): Use writev when `UnixIoSlice` lands.
// https://github.com/denoland/deno/pull/15629
let h = format!("{:x}\r\n", response.len());
let concat = [h.as_bytes(), response, b"\r\n"].concat();
let expected = sock.try_write(&concat);
if expected != concat.len() {
return expected as u32;
}
if shutdown {
// Best case: We've written everything and the stream is done too.
let _ = ctx.requests.remove(&token).unwrap();
}
0
}
#[op]
async fn op_flash_respond_chuncked(
op_state: Rc<RefCell<OpState>>,
@ -157,6 +185,7 @@ async fn op_flash_respond_chuncked(
token: u32,
response: Option<ZeroCopyBuf>,
shutdown: bool,
nwritten: u32,
) -> Result<(), AnyError> {
let mut op_state = op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
@ -178,17 +207,27 @@ async fn op_flash_respond_chuncked(
.with_async_stream(|stream| {
Box::pin(async move {
use tokio::io::AsyncWriteExt;
// TODO(@littledivy): Use writev when `UnixIoSlice` lands.
// https://github.com/denoland/deno/pull/15629
macro_rules! write_whats_not_written {
($e:expr) => {
let e = $e;
let n = nwritten as usize;
if n < e.len() {
stream.write_all(&e[n..]).await?;
}
};
}
if let Some(response) = response {
stream
.write_all(format!("{:x}\r\n", response.len()).as_bytes())
.await?;
stream.write_all(&response).await?;
stream.write_all(b"\r\n").await?;
let h = format!("{:x}\r\n", response.len());
write_whats_not_written!(h.as_bytes());
write_whats_not_written!(&response);
write_whats_not_written!(b"\r\n");
}
// The last chunk
if shutdown {
stream.write_all(b"0\r\n\r\n").await?;
write_whats_not_written!(b"0\r\n\r\n");
}
Ok(())
@ -1451,6 +1490,7 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
op_flash_respond::decl(),
op_flash_respond_async::decl(),
op_flash_respond_chuncked::decl(),
op_try_flash_respond_chuncked::decl(),
op_flash_method::decl(),
op_flash_path::decl(),
op_flash_headers::decl(),