From 8bdcec1c84636aa00bf7444539e68b49d79b1fbf Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Fri, 19 Aug 2022 10:14:56 +0530 Subject: [PATCH] fix(ext/flash): concurrent response streams (#15493) --- cli/bench/http/deno_http_flash_ops.js | 4 +- cli/tests/unit/flash_test.ts | 67 ++++++++ ext/flash/01_http.js | 214 +++++++++++++------------- ext/flash/lib.rs | 129 ++++++---------- ext/flash/request.rs | 49 ++++++ 5 files changed, 272 insertions(+), 191 deletions(-) create mode 100644 ext/flash/request.rs diff --git a/cli/bench/http/deno_http_flash_ops.js b/cli/bench/http/deno_http_flash_ops.js index 1b833e7f7f..40ca25ff18 100644 --- a/cli/bench/http/deno_http_flash_ops.js +++ b/cli/bench/http/deno_http_flash_ops.js @@ -25,13 +25,15 @@ function respond(token, response) { const response = encode( "HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHello World", ); +let offset = 0; while (true) { let token = nextRequest(); if (token === 0) token = await opAsync("op_flash_next_async", serverId); - for (let i = 0; i < token; i++) { + for (let i = offset; i < offset + token; i++) { respond( i, response, ); } + offset += token; } diff --git a/cli/tests/unit/flash_test.ts b/cli/tests/unit/flash_test.ts index 57138b14f9..51534c79b3 100644 --- a/cli/tests/unit/flash_test.ts +++ b/cli/tests/unit/flash_test.ts @@ -1848,6 +1848,73 @@ Deno.test( }, ); +Deno.test( + { permissions: { net: true } }, + async function httpServerConcurrentRequests() { + const ac = new AbortController(); + const listeningPromise = deferred(); + + let reqCount = -1; + let timerId: number | undefined; + const server = Deno.serve(async (req) => { + reqCount++; + if (reqCount === 0) { + const msg = new TextEncoder().encode("data: hello\r\n\r\n"); + // SSE + const body = new ReadableStream({ + start(controller) { + timerId = setInterval(() => { + controller.enqueue(msg); + }, 1000); + }, + cancel() { + if (typeof timerId === "number") { + clearInterval(timerId); + } + }, + }); + return new Response(body, { + headers: { + "Content-Type": "text/event-stream", + }, + }); + } + + return new Response(`hello ${reqCount}`); + }, { + port: 4503, + signal: ac.signal, + onListen: onListen(listeningPromise), + onError: createOnErrorCb(ac), + }); + + const sseRequest = await fetch(`http://localhost:4503/`); + + const decoder = new TextDecoder(); + const stream = sseRequest.body!.getReader(); + { + const { done, value } = await stream.read(); + assert(!done); + assertEquals(decoder.decode(value), "data: hello\r\n\r\n"); + } + + const helloRequest = await fetch(`http://localhost:4503/`); + assertEquals(helloRequest.status, 200); + assertEquals(await helloRequest.text(), "hello 1"); + + { + const { done, value } = await stream.read(); + assert(!done); + assertEquals(decoder.decode(value), "data: hello\r\n\r\n"); + } + + await stream.cancel(); + clearInterval(timerId); + ac.abort(); + await server; + }, +); + function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader { // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 const tp = new TextProtoReader(r); diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js index fd817219ed..19920da58c 100644 --- a/ext/flash/01_http.js +++ b/ext/flash/01_http.js @@ -237,20 +237,21 @@ await server.finished; }, async serve() { + let offset = 0; while (true) { if (server.closed) { break; } - let token = nextRequestSync(); - if (token === 0) { - token = await core.opAsync("op_flash_next_async", serverId); + let tokens = nextRequestSync(); + if (tokens === 0) { + tokens = await core.opAsync("op_flash_next_async", serverId); if (server.closed) { break; } } - for (let i = 0; i < token; i++) { + for (let i = offset; i < offset + tokens; i++) { let body = null; // There might be a body, but we don't expose it for GET/HEAD requests. // It will be closed automatically once the request has been handled and @@ -290,17 +291,6 @@ if (resp === undefined) { continue; } - - const ws = resp[_ws]; - if (!ws) { - if (hasBody && body[_state] !== "closed") { - // TODO(@littledivy): Optimize by draining in a single op. - try { - await req.arrayBuffer(); - } catch { /* pass */ } - } - } - const innerResp = toInnerResponse(resp); // If response body length is known, it will be sent synchronously in a @@ -360,74 +350,8 @@ respBody = new Uint8Array(0); } - if (isStreamingResponseBody === true) { - const resourceRid = getReadableStreamRid(respBody); - if (resourceRid) { - if (respBody.locked) { - throw new TypeError("ReadableStream is locked."); - } - const reader = respBody.getReader(); // Aquire JS lock. - try { - core.opAsync( - "op_flash_write_resource", - http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - null, - true, - ), - serverId, - i, - resourceRid, - ).then(() => { - // Release JS lock. - readableStreamClose(respBody); - }); - } catch (error) { - await reader.cancel(error); - throw error; - } - } else { - const reader = respBody.getReader(); - let first = true; - a: - while (true) { - const { value, done } = await reader.read(); - if (first) { - first = false; - core.ops.op_flash_respond( - serverId, - i, - http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - null, - ), - value ?? new Uint8Array(), - false, - ); - } else { - if (value === undefined) { - core.ops.op_flash_respond_chuncked( - serverId, - i, - undefined, - done, - ); - } else { - respondChunked( - i, - value, - done, - ); - } - } - if (done) break a; - } - } - } else { + const ws = resp[_ws]; + if (isStreamingResponseBody === false) { const responseStr = http1Response( method, innerResp.status ?? 200, @@ -456,29 +380,111 @@ } } - if (ws) { - const wsRid = await core.opAsync( - "op_flash_upgrade_websocket", - serverId, - i, - ); - ws[_rid] = wsRid; - ws[_protocol] = resp.headers.get("sec-websocket-protocol"); - - ws[_readyState] = WebSocket.OPEN; - const event = new Event("open"); - ws.dispatchEvent(event); - - ws[_eventLoop](); - if (ws[_idleTimeoutDuration]) { - ws.addEventListener( - "close", - () => clearTimeout(ws[_idleTimeoutTimeout]), - ); + (async () => { + if (!ws) { + if (hasBody && body[_state] !== "closed") { + // TODO(@littledivy): Optimize by draining in a single op. + try { + await req.arrayBuffer(); + } catch { /* pass */ } + } } - ws[_serverHandleIdleTimeout](); - } + + if (isStreamingResponseBody === true) { + const resourceRid = getReadableStreamRid(respBody); + if (resourceRid) { + if (respBody.locked) { + throw new TypeError("ReadableStream is locked."); + } + const reader = respBody.getReader(); // Aquire JS lock. + try { + core.opAsync( + "op_flash_write_resource", + http1Response( + method, + innerResp.status ?? 200, + innerResp.headerList, + null, + true, + ), + serverId, + i, + resourceRid, + ).then(() => { + // Release JS lock. + readableStreamClose(respBody); + }); + } catch (error) { + await reader.cancel(error); + throw error; + } + } else { + const reader = respBody.getReader(); + let first = true; + a: + while (true) { + const { value, done } = await reader.read(); + if (first) { + first = false; + core.ops.op_flash_respond( + serverId, + i, + http1Response( + method, + innerResp.status ?? 200, + innerResp.headerList, + null, + ), + value ?? new Uint8Array(), + false, + ); + } else { + if (value === undefined) { + core.ops.op_flash_respond_chuncked( + serverId, + i, + undefined, + done, + ); + } else { + respondChunked( + i, + value, + done, + ); + } + } + if (done) break a; + } + } + } + + if (ws) { + const wsRid = await core.opAsync( + "op_flash_upgrade_websocket", + serverId, + i, + ); + ws[_rid] = wsRid; + ws[_protocol] = resp.headers.get("sec-websocket-protocol"); + + ws[_readyState] = WebSocket.OPEN; + const event = new Event("open"); + ws.dispatchEvent(event); + + ws[_eventLoop](); + if (ws[_idleTimeoutDuration]) { + ws.addEventListener( + "close", + () => clearTimeout(ws[_idleTimeoutTimeout]), + ); + } + ws[_serverHandleIdleTimeout](); + } + })().catch(onError); } + + offset += tokens; } await server.finished; }, diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs index 2c0cc548a6..8f3cb341a2 100644 --- a/ext/flash/lib.rs +++ b/ext/flash/lib.rs @@ -25,7 +25,6 @@ use http::header::CONNECTION; use http::header::CONTENT_LENGTH; use http::header::EXPECT; use http::header::TRANSFER_ENCODING; -use http::header::UPGRADE; use http::HeaderValue; use log::trace; use mio::net::TcpListener; @@ -58,10 +57,13 @@ use tokio::sync::mpsc; use tokio::task::JoinHandle; mod chunked; - +mod request; #[cfg(unix)] mod sendfile; +use request::InnerRequest; +use request::Request; + pub struct FlashContext { next_server_id: u32, join_handles: HashMap>>, @@ -70,22 +72,15 @@ pub struct FlashContext { pub struct ServerContext { _addr: SocketAddr, - tx: mpsc::Sender, - rx: mpsc::Receiver, - response: HashMap, + tx: mpsc::Sender, + rx: mpsc::Receiver, + requests: HashMap, + next_token: u32, listening_rx: Option>, close_tx: mpsc::Sender<()>, cancel_handle: Rc, } -struct InnerRequest { - _headers: Vec>, - req: httparse::Request<'static, 'static>, - body_offset: usize, - body_len: usize, - buffer: Pin>, -} - #[derive(Debug, PartialEq)] enum ParseStatus { None, @@ -99,7 +94,7 @@ enum InnerStream { Tls(Box), } -struct Stream { +pub struct Stream { inner: InnerStream, detached: bool, read_rx: Option>, @@ -147,38 +142,6 @@ impl Read for Stream { } } -struct NextRequest { - // Pointer to stream owned by the server loop thread. - // - // Why not Arc>? Performance. The stream - // is never written to by the server loop thread. - // - // Dereferencing is safe until server thread finishes and - // op_flash_serve resolves or websocket upgrade is performed. - socket: *mut Stream, - inner: InnerRequest, - keep_alive: bool, - #[allow(dead_code)] - upgrade: bool, - content_read: usize, - content_length: Option, - remaining_chunk_size: Option, - te_chunked: bool, - expect_continue: bool, -} - -// SAFETY: Sent from server thread to JS thread. -// See comment above for `socket`. -unsafe impl Send for NextRequest {} - -impl NextRequest { - #[inline(always)] - pub fn socket<'a>(&self) -> &'a mut Stream { - // SAFETY: Dereferencing is safe until server thread detaches socket or finishes. - unsafe { &mut *self.socket } - } -} - #[op] fn op_flash_respond( op_state: &mut OpState, @@ -194,13 +157,13 @@ fn op_flash_respond( let mut close = false; let sock = match shutdown { true => { - let tx = ctx.response.remove(&token).unwrap(); + let tx = ctx.requests.remove(&token).unwrap(); close = !tx.keep_alive; tx.socket() } // In case of a websocket upgrade or streaming response. false => { - let tx = ctx.response.get(&token).unwrap(); + let tx = ctx.requests.get(&token).unwrap(); tx.socket() } }; @@ -263,7 +226,7 @@ async fn op_flash_write_resource( let op_state = &mut op_state.borrow_mut(); let flash_ctx = op_state.borrow_mut::(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - ctx.response.remove(&token).unwrap().socket() + ctx.requests.remove(&token).unwrap().socket() }; drop(op_state); @@ -344,13 +307,13 @@ fn flash_respond( let mut close = false; let sock = match shutdown { true => { - let tx = ctx.response.remove(&token).unwrap(); + let tx = ctx.requests.remove(&token).unwrap(); close = !tx.keep_alive; tx.socket() } // In case of a websocket upgrade or streaming response. false => { - let tx = ctx.response.get(&token).unwrap(); + let tx = ctx.requests.get(&token).unwrap(); tx.socket() } }; @@ -438,12 +401,12 @@ fn respond_chunked( ) { let sock = match shutdown { true => { - let tx = ctx.response.remove(&token).unwrap(); + let tx = ctx.requests.remove(&token).unwrap(); tx.socket() } // In case of a websocket upgrade or streaming response. false => { - let tx = ctx.response.get(&token).unwrap(); + let tx = ctx.requests.get(&token).unwrap(); tx.socket() } }; @@ -469,7 +432,7 @@ macro_rules! get_request { ($op_state: ident, $server_id: expr, $token: ident) => {{ let flash_ctx = $op_state.borrow_mut::(); let ctx = flash_ctx.servers.get_mut(&$server_id).unwrap(); - ctx.response.get_mut(&$token).unwrap() + ctx.requests.get_mut(&$token).unwrap() }}; } @@ -487,8 +450,8 @@ pub enum Method { } #[inline] -fn get_method(req: &mut NextRequest) -> u32 { - let method = match req.inner.req.method.unwrap() { +fn get_method(req: &mut Request) -> u32 { + let method = match req.method() { "GET" => Method::GET, "POST" => Method::POST, "PUT" => Method::PUT, @@ -531,7 +494,7 @@ fn op_flash_path( let flash_ctx = op_state.borrow_mut::(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); ctx - .response + .requests .get(&token) .unwrap() .inner @@ -543,12 +506,14 @@ fn op_flash_path( #[inline] fn next_request_sync(ctx: &mut ServerContext) -> u32 { - let mut tokens = 0; + let offset = ctx.next_token; + while let Ok(token) = ctx.rx.try_recv() { - ctx.response.insert(tokens, token); - tokens += 1; + ctx.requests.insert(ctx.next_token, token); + ctx.next_token += 1; } - tokens + + ctx.next_token - offset } pub struct NextRequestFast; @@ -597,7 +562,7 @@ unsafe fn op_flash_get_method_fast( let ptr = recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); let ctx = &mut *(ptr as *mut ServerContext); - let req = ctx.response.get_mut(&token).unwrap(); + let req = ctx.requests.get_mut(&token).unwrap(); get_method(req) } @@ -651,7 +616,7 @@ fn op_flash_make_request<'scope>( // SAFETY: This external is guaranteed to be a pointer to a ServerContext let ctx = unsafe { &mut *(external.value() as *mut ServerContext) }; let token = args.get(0).uint32_value(scope).unwrap(); - let req = ctx.response.get_mut(&token).unwrap(); + let req = ctx.requests.get_mut(&token).unwrap(); rv.set_uint32(get_method(req)); }, ) @@ -747,7 +712,7 @@ fn op_flash_make_request<'scope>( } #[inline] -fn has_body_stream(req: &NextRequest) -> bool { +fn has_body_stream(req: &Request) -> bool { let sock = req.socket(); sock.read_rx.is_some() } @@ -775,7 +740,7 @@ fn op_flash_headers( .get_mut(&server_id) .ok_or_else(|| type_error("server closed"))?; let inner_req = &ctx - .response + .requests .get(&token) .ok_or_else(|| type_error("request closed"))? .inner @@ -876,7 +841,7 @@ async fn op_flash_read_body( .as_mut() .unwrap() }; - let tx = ctx.response.get_mut(&token).unwrap(); + let tx = ctx.requests.get_mut(&token).unwrap(); if tx.te_chunked { let mut decoder = @@ -974,7 +939,7 @@ pub struct ListenOpts { } fn run_server( - tx: mpsc::Sender, + tx: mpsc::Sender, listening_tx: mpsc::Sender<()>, mut close_rx: mpsc::Receiver<()>, addr: SocketAddr, @@ -1178,7 +1143,6 @@ fn run_server( // https://github.com/tiny-http/tiny-http/blob/master/src/client.rs#L177 // https://github.com/hyperium/hyper/blob/4545c3ef191ce9b5f5d250ee27c4c96f9b71d2c6/src/proto/h1/role.rs#L127 let mut keep_alive = inner_req.req.version.unwrap() == 1; - let mut upgrade = false; let mut expect_continue = false; let mut te = false; let mut te_chunked = false; @@ -1198,9 +1162,6 @@ fn run_server( keep_alive = connection_has(&value, "keep-alive"); } } - Ok(UPGRADE) => { - upgrade = inner_req.req.version.unwrap() == 1; - } Ok(TRANSFER_ENCODING) => { // https://tools.ietf.org/html/rfc7230#section-3.3.3 debug_assert!(inner_req.req.version.unwrap() == 1); @@ -1250,12 +1211,11 @@ fn run_server( continue 'events; } - tx.blocking_send(NextRequest { + tx.blocking_send(Request { socket: sock_ptr, // SAFETY: headers backing buffer outlives the mio event loop ('static) inner: inner_req, keep_alive, - upgrade, te_chunked, remaining_chunk_size: None, content_read: 0, @@ -1295,7 +1255,8 @@ where _addr: addr, tx, rx, - response: HashMap::with_capacity(1000), + requests: HashMap::with_capacity(1000), + next_token: 0, close_tx, listening_rx: Some(listening_rx), cancel_handle: CancelHandle::new_rc(), @@ -1371,18 +1332,14 @@ async fn op_flash_next_async( // is responsible for ensuring this is not called concurrently. let ctx = unsafe { &mut *ctx }; let cancel_handle = &ctx.cancel_handle; - let mut tokens = 0; - while let Ok(token) = ctx.rx.try_recv() { - ctx.response.insert(tokens, token); - tokens += 1; + + if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await { + ctx.requests.insert(ctx.next_token, req); + ctx.next_token += 1; + return 1; } - if tokens == 0 { - if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await { - ctx.response.insert(tokens, req); - tokens += 1; - } - } - tokens + + 0 } // Syncrhonous version of op_flash_next_async. Under heavy load, @@ -1455,7 +1412,7 @@ pub fn detach_socket( // * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we // use raw fds. let tx = ctx - .response + .requests .remove(&token) .ok_or_else(|| type_error("request closed"))?; let stream = tx.socket(); diff --git a/ext/flash/request.rs b/ext/flash/request.rs new file mode 100644 index 0000000000..0736b56206 --- /dev/null +++ b/ext/flash/request.rs @@ -0,0 +1,49 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use crate::Stream; +use std::pin::Pin; + +#[derive(Debug)] +pub struct InnerRequest { + /// Backing buffer for the request. + pub buffer: Pin>, + /// Owned headers, we have to keep it around since its referenced in `req`. + pub _headers: Vec>, + /// Fully parsed request. + pub req: httparse::Request<'static, 'static>, + pub body_offset: usize, + pub body_len: usize, +} + +#[derive(Debug)] +pub struct Request { + pub inner: InnerRequest, + // Pointer to stream owned by the server loop thread. + // + // Dereferencing is safe until server thread finishes and + // op_flash_serve resolves or websocket upgrade is performed. + pub socket: *mut Stream, + pub keep_alive: bool, + pub content_read: usize, + pub content_length: Option, + pub remaining_chunk_size: Option, + pub te_chunked: bool, + pub expect_continue: bool, +} + +// SAFETY: Sent from server thread to JS thread. +// See comment above for `socket`. +unsafe impl Send for Request {} + +impl Request { + #[inline(always)] + pub fn socket<'a>(&self) -> &'a mut Stream { + // SAFETY: Dereferencing is safe until server thread detaches socket or finishes. + unsafe { &mut *self.socket } + } + + #[inline(always)] + pub fn method(&self) -> &str { + self.inner.req.method.unwrap() + } +}