1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-09 07:39:15 -05:00

Reland "perf(ext/flash): optimize response streaming" (#16660)

This commit is contained in:
Divy Srivastava 2022-11-22 02:53:58 -08:00 committed by GitHub
parent 1ec357faf3
commit ef82211377
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 14 deletions

View file

@ -243,6 +243,7 @@
i, i,
respondFast, respondFast,
respondChunked, respondChunked,
tryRespondChunked,
) { ) {
// there might've been an HTTP upgrade. // there might've been an HTTP upgrade.
if (resp === undefined) { if (resp === undefined) {
@ -371,6 +372,9 @@
} }
} else { } else {
const reader = respBody.getReader(); const reader = respBody.getReader();
// Best case: sends headers + first chunk in a single go.
const { value, done } = await reader.read();
writeFixedResponse( writeFixedResponse(
serverId, serverId,
i, i,
@ -385,14 +389,23 @@
false, false,
respondFast, respondFast,
); );
while (true) {
const { value, done } = await reader.read(); await tryRespondChunked(
await respondChunked( i,
i, value,
value, done,
done, );
);
if (done) break; if (!done) {
while (true) {
const chunk = await reader.read();
await respondChunked(
i,
chunk.value,
chunk.done,
);
if (chunk.done) break;
}
} }
} }
} }
@ -572,6 +585,7 @@
i, i,
respondFast, respondFast,
respondChunked, respondChunked,
tryRespondChunked,
), ),
), ),
onError, onError,
@ -589,6 +603,7 @@
i, i,
respondFast, respondFast,
respondChunked, respondChunked,
tryRespondChunked,
) )
).catch(onError); ).catch(onError);
continue; continue;
@ -607,6 +622,7 @@
i, i,
respondFast, respondFast,
respondChunked, respondChunked,
tryRespondChunked,
); );
} }
@ -623,6 +639,25 @@
once: true, once: true,
}); });
function tryRespondChunked(token, chunk, shutdown) {
const nwritten = core.ops.op_try_flash_respond_chuncked(
serverId,
token,
chunk ?? new Uint8Array(),
shutdown,
);
if (nwritten > 0) {
return core.opAsync(
"op_flash_respond_chuncked",
serverId,
token,
chunk,
shutdown,
nwritten,
);
}
}
function respondChunked(token, chunk, shutdown) { function respondChunked(token, chunk, shutdown) {
return core.opAsync( return core.opAsync(
"op_flash_respond_chuncked", "op_flash_respond_chuncked",

View file

@ -106,6 +106,39 @@ fn op_flash_respond(
flash_respond(ctx, token, shutdown, &response) flash_respond(ctx, token, shutdown, &response)
} }
#[op(fast)]
fn op_try_flash_respond_chuncked(
op_state: &mut OpState,
server_id: u32,
token: u32,
response: &[u8],
shutdown: bool,
) -> u32 {
let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
let tx = ctx.requests.get(&token).unwrap();
let sock = tx.socket();
// TODO(@littledivy): Use writev when `UnixIoSlice` lands.
// https://github.com/denoland/deno/pull/15629
let h = format!("{:x}\r\n", response.len());
let concat = [h.as_bytes(), response, b"\r\n"].concat();
let expected = sock.try_write(&concat);
if expected != concat.len() {
if expected > 2 {
return expected as u32;
}
return expected as u32;
}
if shutdown {
// Best case: We've written everything and the stream is done too.
let _ = ctx.requests.remove(&token).unwrap();
}
0
}
#[op] #[op]
async fn op_flash_respond_async( async fn op_flash_respond_async(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
@ -157,6 +190,7 @@ async fn op_flash_respond_chuncked(
token: u32, token: u32,
response: Option<ZeroCopyBuf>, response: Option<ZeroCopyBuf>,
shutdown: bool, shutdown: bool,
nwritten: u32,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let mut op_state = op_state.borrow_mut(); let mut op_state = op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>(); let flash_ctx = op_state.borrow_mut::<FlashContext>();
@ -178,17 +212,27 @@ async fn op_flash_respond_chuncked(
.with_async_stream(|stream| { .with_async_stream(|stream| {
Box::pin(async move { Box::pin(async move {
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
// TODO(@littledivy): Use writev when `UnixIoSlice` lands.
// https://github.com/denoland/deno/pull/15629
macro_rules! write_whats_not_written {
($e:expr) => {
let e = $e;
let n = nwritten as usize;
if n < e.len() {
stream.write_all(&e[n..]).await?;
}
};
}
if let Some(response) = response { if let Some(response) = response {
stream let h = format!("{:x}\r\n", response.len());
.write_all(format!("{:x}\r\n", response.len()).as_bytes()) write_whats_not_written!(h.as_bytes());
.await?; write_whats_not_written!(&response);
stream.write_all(&response).await?; write_whats_not_written!(b"\r\n");
stream.write_all(b"\r\n").await?;
} }
// The last chunk // The last chunk
if shutdown { if shutdown {
stream.write_all(b"0\r\n\r\n").await?; write_whats_not_written!(b"0\r\n\r\n");
} }
Ok(()) Ok(())
@ -1485,6 +1529,7 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
op_flash_close_server::decl(), op_flash_close_server::decl(),
op_flash_make_request::decl(), op_flash_make_request::decl(),
op_flash_write_resource::decl(), op_flash_write_resource::decl(),
op_try_flash_respond_chuncked::decl(),
]) ])
.state(move |op_state| { .state(move |op_state| {
op_state.put(Unstable(unstable)); op_state.put(Unstable(unstable));