1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-24 08:09:08 -05:00

fix(ext/flash): concurrent response streams (#15493)

This commit is contained in:
Divy Srivastava 2022-08-19 10:14:56 +05:30 committed by GitHub
parent cd21cff299
commit 8bdcec1c84
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 272 additions and 191 deletions

View file

@ -25,13 +25,15 @@ function respond(token, response) {
const response = encode( const response = encode(
"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHello World", "HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHello World",
); );
let offset = 0;
while (true) { while (true) {
let token = nextRequest(); let token = nextRequest();
if (token === 0) token = await opAsync("op_flash_next_async", serverId); if (token === 0) token = await opAsync("op_flash_next_async", serverId);
for (let i = 0; i < token; i++) { for (let i = offset; i < offset + token; i++) {
respond( respond(
i, i,
response, response,
); );
} }
offset += token;
} }

View file

@ -1848,6 +1848,73 @@ Deno.test(
}, },
); );
Deno.test(
{ permissions: { net: true } },
async function httpServerConcurrentRequests() {
const ac = new AbortController();
const listeningPromise = deferred();
let reqCount = -1;
let timerId: number | undefined;
const server = Deno.serve(async (req) => {
reqCount++;
if (reqCount === 0) {
const msg = new TextEncoder().encode("data: hello\r\n\r\n");
// SSE
const body = new ReadableStream({
start(controller) {
timerId = setInterval(() => {
controller.enqueue(msg);
}, 1000);
},
cancel() {
if (typeof timerId === "number") {
clearInterval(timerId);
}
},
});
return new Response(body, {
headers: {
"Content-Type": "text/event-stream",
},
});
}
return new Response(`hello ${reqCount}`);
}, {
port: 4503,
signal: ac.signal,
onListen: onListen(listeningPromise),
onError: createOnErrorCb(ac),
});
const sseRequest = await fetch(`http://localhost:4503/`);
const decoder = new TextDecoder();
const stream = sseRequest.body!.getReader();
{
const { done, value } = await stream.read();
assert(!done);
assertEquals(decoder.decode(value), "data: hello\r\n\r\n");
}
const helloRequest = await fetch(`http://localhost:4503/`);
assertEquals(helloRequest.status, 200);
assertEquals(await helloRequest.text(), "hello 1");
{
const { done, value } = await stream.read();
assert(!done);
assertEquals(decoder.decode(value), "data: hello\r\n\r\n");
}
await stream.cancel();
clearInterval(timerId);
ac.abort();
await server;
},
);
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader { function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r); const tp = new TextProtoReader(r);

View file

@ -237,20 +237,21 @@
await server.finished; await server.finished;
}, },
async serve() { async serve() {
let offset = 0;
while (true) { while (true) {
if (server.closed) { if (server.closed) {
break; break;
} }
let token = nextRequestSync(); let tokens = nextRequestSync();
if (token === 0) { if (tokens === 0) {
token = await core.opAsync("op_flash_next_async", serverId); tokens = await core.opAsync("op_flash_next_async", serverId);
if (server.closed) { if (server.closed) {
break; break;
} }
} }
for (let i = 0; i < token; i++) { for (let i = offset; i < offset + tokens; i++) {
let body = null; let body = null;
// There might be a body, but we don't expose it for GET/HEAD requests. // There might be a body, but we don't expose it for GET/HEAD requests.
// It will be closed automatically once the request has been handled and // It will be closed automatically once the request has been handled and
@ -290,17 +291,6 @@
if (resp === undefined) { if (resp === undefined) {
continue; continue;
} }
const ws = resp[_ws];
if (!ws) {
if (hasBody && body[_state] !== "closed") {
// TODO(@littledivy): Optimize by draining in a single op.
try {
await req.arrayBuffer();
} catch { /* pass */ }
}
}
const innerResp = toInnerResponse(resp); const innerResp = toInnerResponse(resp);
// If response body length is known, it will be sent synchronously in a // If response body length is known, it will be sent synchronously in a
@ -360,74 +350,8 @@
respBody = new Uint8Array(0); respBody = new Uint8Array(0);
} }
if (isStreamingResponseBody === true) { const ws = resp[_ws];
const resourceRid = getReadableStreamRid(respBody); if (isStreamingResponseBody === false) {
if (resourceRid) {
if (respBody.locked) {
throw new TypeError("ReadableStream is locked.");
}
const reader = respBody.getReader(); // Aquire JS lock.
try {
core.opAsync(
"op_flash_write_resource",
http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
null,
true,
),
serverId,
i,
resourceRid,
).then(() => {
// Release JS lock.
readableStreamClose(respBody);
});
} catch (error) {
await reader.cancel(error);
throw error;
}
} else {
const reader = respBody.getReader();
let first = true;
a:
while (true) {
const { value, done } = await reader.read();
if (first) {
first = false;
core.ops.op_flash_respond(
serverId,
i,
http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
null,
),
value ?? new Uint8Array(),
false,
);
} else {
if (value === undefined) {
core.ops.op_flash_respond_chuncked(
serverId,
i,
undefined,
done,
);
} else {
respondChunked(
i,
value,
done,
);
}
}
if (done) break a;
}
}
} else {
const responseStr = http1Response( const responseStr = http1Response(
method, method,
innerResp.status ?? 200, innerResp.status ?? 200,
@ -456,29 +380,111 @@
} }
} }
if (ws) { (async () => {
const wsRid = await core.opAsync( if (!ws) {
"op_flash_upgrade_websocket", if (hasBody && body[_state] !== "closed") {
serverId, // TODO(@littledivy): Optimize by draining in a single op.
i, try {
); await req.arrayBuffer();
ws[_rid] = wsRid; } catch { /* pass */ }
ws[_protocol] = resp.headers.get("sec-websocket-protocol"); }
ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);
ws[_eventLoop]();
if (ws[_idleTimeoutDuration]) {
ws.addEventListener(
"close",
() => clearTimeout(ws[_idleTimeoutTimeout]),
);
} }
ws[_serverHandleIdleTimeout]();
} if (isStreamingResponseBody === true) {
const resourceRid = getReadableStreamRid(respBody);
if (resourceRid) {
if (respBody.locked) {
throw new TypeError("ReadableStream is locked.");
}
const reader = respBody.getReader(); // Aquire JS lock.
try {
core.opAsync(
"op_flash_write_resource",
http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
null,
true,
),
serverId,
i,
resourceRid,
).then(() => {
// Release JS lock.
readableStreamClose(respBody);
});
} catch (error) {
await reader.cancel(error);
throw error;
}
} else {
const reader = respBody.getReader();
let first = true;
a:
while (true) {
const { value, done } = await reader.read();
if (first) {
first = false;
core.ops.op_flash_respond(
serverId,
i,
http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
null,
),
value ?? new Uint8Array(),
false,
);
} else {
if (value === undefined) {
core.ops.op_flash_respond_chuncked(
serverId,
i,
undefined,
done,
);
} else {
respondChunked(
i,
value,
done,
);
}
}
if (done) break a;
}
}
}
if (ws) {
const wsRid = await core.opAsync(
"op_flash_upgrade_websocket",
serverId,
i,
);
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");
ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);
ws[_eventLoop]();
if (ws[_idleTimeoutDuration]) {
ws.addEventListener(
"close",
() => clearTimeout(ws[_idleTimeoutTimeout]),
);
}
ws[_serverHandleIdleTimeout]();
}
})().catch(onError);
} }
offset += tokens;
} }
await server.finished; await server.finished;
}, },

View file

@ -25,7 +25,6 @@ use http::header::CONNECTION;
use http::header::CONTENT_LENGTH; use http::header::CONTENT_LENGTH;
use http::header::EXPECT; use http::header::EXPECT;
use http::header::TRANSFER_ENCODING; use http::header::TRANSFER_ENCODING;
use http::header::UPGRADE;
use http::HeaderValue; use http::HeaderValue;
use log::trace; use log::trace;
use mio::net::TcpListener; use mio::net::TcpListener;
@ -58,10 +57,13 @@ use tokio::sync::mpsc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
mod chunked; mod chunked;
mod request;
#[cfg(unix)] #[cfg(unix)]
mod sendfile; mod sendfile;
use request::InnerRequest;
use request::Request;
pub struct FlashContext { pub struct FlashContext {
next_server_id: u32, next_server_id: u32,
join_handles: HashMap<u32, JoinHandle<Result<(), AnyError>>>, join_handles: HashMap<u32, JoinHandle<Result<(), AnyError>>>,
@ -70,22 +72,15 @@ pub struct FlashContext {
pub struct ServerContext { pub struct ServerContext {
_addr: SocketAddr, _addr: SocketAddr,
tx: mpsc::Sender<NextRequest>, tx: mpsc::Sender<Request>,
rx: mpsc::Receiver<NextRequest>, rx: mpsc::Receiver<Request>,
response: HashMap<u32, NextRequest>, requests: HashMap<u32, Request>,
next_token: u32,
listening_rx: Option<mpsc::Receiver<()>>, listening_rx: Option<mpsc::Receiver<()>>,
close_tx: mpsc::Sender<()>, close_tx: mpsc::Sender<()>,
cancel_handle: Rc<CancelHandle>, cancel_handle: Rc<CancelHandle>,
} }
struct InnerRequest {
_headers: Vec<httparse::Header<'static>>,
req: httparse::Request<'static, 'static>,
body_offset: usize,
body_len: usize,
buffer: Pin<Box<[u8]>>,
}
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
enum ParseStatus { enum ParseStatus {
None, None,
@ -99,7 +94,7 @@ enum InnerStream {
Tls(Box<TlsTcpStream>), Tls(Box<TlsTcpStream>),
} }
struct Stream { pub struct Stream {
inner: InnerStream, inner: InnerStream,
detached: bool, detached: bool,
read_rx: Option<mpsc::Receiver<()>>, read_rx: Option<mpsc::Receiver<()>>,
@ -147,38 +142,6 @@ impl Read for Stream {
} }
} }
struct NextRequest {
// Pointer to stream owned by the server loop thread.
//
// Why not Arc<Mutex<Stream>>? Performance. The stream
// is never written to by the server loop thread.
//
// Dereferencing is safe until server thread finishes and
// op_flash_serve resolves or websocket upgrade is performed.
socket: *mut Stream,
inner: InnerRequest,
keep_alive: bool,
#[allow(dead_code)]
upgrade: bool,
content_read: usize,
content_length: Option<u64>,
remaining_chunk_size: Option<usize>,
te_chunked: bool,
expect_continue: bool,
}
// SAFETY: Sent from server thread to JS thread.
// See comment above for `socket`.
unsafe impl Send for NextRequest {}
impl NextRequest {
#[inline(always)]
pub fn socket<'a>(&self) -> &'a mut Stream {
// SAFETY: Dereferencing is safe until server thread detaches socket or finishes.
unsafe { &mut *self.socket }
}
}
#[op] #[op]
fn op_flash_respond( fn op_flash_respond(
op_state: &mut OpState, op_state: &mut OpState,
@ -194,13 +157,13 @@ fn op_flash_respond(
let mut close = false; let mut close = false;
let sock = match shutdown { let sock = match shutdown {
true => { true => {
let tx = ctx.response.remove(&token).unwrap(); let tx = ctx.requests.remove(&token).unwrap();
close = !tx.keep_alive; close = !tx.keep_alive;
tx.socket() tx.socket()
} }
// In case of a websocket upgrade or streaming response. // In case of a websocket upgrade or streaming response.
false => { false => {
let tx = ctx.response.get(&token).unwrap(); let tx = ctx.requests.get(&token).unwrap();
tx.socket() tx.socket()
} }
}; };
@ -263,7 +226,7 @@ async fn op_flash_write_resource(
let op_state = &mut op_state.borrow_mut(); let op_state = &mut op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>(); let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
ctx.response.remove(&token).unwrap().socket() ctx.requests.remove(&token).unwrap().socket()
}; };
drop(op_state); drop(op_state);
@ -344,13 +307,13 @@ fn flash_respond(
let mut close = false; let mut close = false;
let sock = match shutdown { let sock = match shutdown {
true => { true => {
let tx = ctx.response.remove(&token).unwrap(); let tx = ctx.requests.remove(&token).unwrap();
close = !tx.keep_alive; close = !tx.keep_alive;
tx.socket() tx.socket()
} }
// In case of a websocket upgrade or streaming response. // In case of a websocket upgrade or streaming response.
false => { false => {
let tx = ctx.response.get(&token).unwrap(); let tx = ctx.requests.get(&token).unwrap();
tx.socket() tx.socket()
} }
}; };
@ -438,12 +401,12 @@ fn respond_chunked(
) { ) {
let sock = match shutdown { let sock = match shutdown {
true => { true => {
let tx = ctx.response.remove(&token).unwrap(); let tx = ctx.requests.remove(&token).unwrap();
tx.socket() tx.socket()
} }
// In case of a websocket upgrade or streaming response. // In case of a websocket upgrade or streaming response.
false => { false => {
let tx = ctx.response.get(&token).unwrap(); let tx = ctx.requests.get(&token).unwrap();
tx.socket() tx.socket()
} }
}; };
@ -469,7 +432,7 @@ macro_rules! get_request {
($op_state: ident, $server_id: expr, $token: ident) => {{ ($op_state: ident, $server_id: expr, $token: ident) => {{
let flash_ctx = $op_state.borrow_mut::<FlashContext>(); let flash_ctx = $op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&$server_id).unwrap(); let ctx = flash_ctx.servers.get_mut(&$server_id).unwrap();
ctx.response.get_mut(&$token).unwrap() ctx.requests.get_mut(&$token).unwrap()
}}; }};
} }
@ -487,8 +450,8 @@ pub enum Method {
} }
#[inline] #[inline]
fn get_method(req: &mut NextRequest) -> u32 { fn get_method(req: &mut Request) -> u32 {
let method = match req.inner.req.method.unwrap() { let method = match req.method() {
"GET" => Method::GET, "GET" => Method::GET,
"POST" => Method::POST, "POST" => Method::POST,
"PUT" => Method::PUT, "PUT" => Method::PUT,
@ -531,7 +494,7 @@ fn op_flash_path(
let flash_ctx = op_state.borrow_mut::<FlashContext>(); let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
ctx ctx
.response .requests
.get(&token) .get(&token)
.unwrap() .unwrap()
.inner .inner
@ -543,12 +506,14 @@ fn op_flash_path(
#[inline] #[inline]
fn next_request_sync(ctx: &mut ServerContext) -> u32 { fn next_request_sync(ctx: &mut ServerContext) -> u32 {
let mut tokens = 0; let offset = ctx.next_token;
while let Ok(token) = ctx.rx.try_recv() { while let Ok(token) = ctx.rx.try_recv() {
ctx.response.insert(tokens, token); ctx.requests.insert(ctx.next_token, token);
tokens += 1; ctx.next_token += 1;
} }
tokens
ctx.next_token - offset
} }
pub struct NextRequestFast; pub struct NextRequestFast;
@ -597,7 +562,7 @@ unsafe fn op_flash_get_method_fast(
let ptr = let ptr =
recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
let ctx = &mut *(ptr as *mut ServerContext); let ctx = &mut *(ptr as *mut ServerContext);
let req = ctx.response.get_mut(&token).unwrap(); let req = ctx.requests.get_mut(&token).unwrap();
get_method(req) get_method(req)
} }
@ -651,7 +616,7 @@ fn op_flash_make_request<'scope>(
// SAFETY: This external is guaranteed to be a pointer to a ServerContext // SAFETY: This external is guaranteed to be a pointer to a ServerContext
let ctx = unsafe { &mut *(external.value() as *mut ServerContext) }; let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
let token = args.get(0).uint32_value(scope).unwrap(); let token = args.get(0).uint32_value(scope).unwrap();
let req = ctx.response.get_mut(&token).unwrap(); let req = ctx.requests.get_mut(&token).unwrap();
rv.set_uint32(get_method(req)); rv.set_uint32(get_method(req));
}, },
) )
@ -747,7 +712,7 @@ fn op_flash_make_request<'scope>(
} }
#[inline] #[inline]
fn has_body_stream(req: &NextRequest) -> bool { fn has_body_stream(req: &Request) -> bool {
let sock = req.socket(); let sock = req.socket();
sock.read_rx.is_some() sock.read_rx.is_some()
} }
@ -775,7 +740,7 @@ fn op_flash_headers(
.get_mut(&server_id) .get_mut(&server_id)
.ok_or_else(|| type_error("server closed"))?; .ok_or_else(|| type_error("server closed"))?;
let inner_req = &ctx let inner_req = &ctx
.response .requests
.get(&token) .get(&token)
.ok_or_else(|| type_error("request closed"))? .ok_or_else(|| type_error("request closed"))?
.inner .inner
@ -876,7 +841,7 @@ async fn op_flash_read_body(
.as_mut() .as_mut()
.unwrap() .unwrap()
}; };
let tx = ctx.response.get_mut(&token).unwrap(); let tx = ctx.requests.get_mut(&token).unwrap();
if tx.te_chunked { if tx.te_chunked {
let mut decoder = let mut decoder =
@ -974,7 +939,7 @@ pub struct ListenOpts {
} }
fn run_server( fn run_server(
tx: mpsc::Sender<NextRequest>, tx: mpsc::Sender<Request>,
listening_tx: mpsc::Sender<()>, listening_tx: mpsc::Sender<()>,
mut close_rx: mpsc::Receiver<()>, mut close_rx: mpsc::Receiver<()>,
addr: SocketAddr, addr: SocketAddr,
@ -1178,7 +1143,6 @@ fn run_server(
// https://github.com/tiny-http/tiny-http/blob/master/src/client.rs#L177 // https://github.com/tiny-http/tiny-http/blob/master/src/client.rs#L177
// https://github.com/hyperium/hyper/blob/4545c3ef191ce9b5f5d250ee27c4c96f9b71d2c6/src/proto/h1/role.rs#L127 // https://github.com/hyperium/hyper/blob/4545c3ef191ce9b5f5d250ee27c4c96f9b71d2c6/src/proto/h1/role.rs#L127
let mut keep_alive = inner_req.req.version.unwrap() == 1; let mut keep_alive = inner_req.req.version.unwrap() == 1;
let mut upgrade = false;
let mut expect_continue = false; let mut expect_continue = false;
let mut te = false; let mut te = false;
let mut te_chunked = false; let mut te_chunked = false;
@ -1198,9 +1162,6 @@ fn run_server(
keep_alive = connection_has(&value, "keep-alive"); keep_alive = connection_has(&value, "keep-alive");
} }
} }
Ok(UPGRADE) => {
upgrade = inner_req.req.version.unwrap() == 1;
}
Ok(TRANSFER_ENCODING) => { Ok(TRANSFER_ENCODING) => {
// https://tools.ietf.org/html/rfc7230#section-3.3.3 // https://tools.ietf.org/html/rfc7230#section-3.3.3
debug_assert!(inner_req.req.version.unwrap() == 1); debug_assert!(inner_req.req.version.unwrap() == 1);
@ -1250,12 +1211,11 @@ fn run_server(
continue 'events; continue 'events;
} }
tx.blocking_send(NextRequest { tx.blocking_send(Request {
socket: sock_ptr, socket: sock_ptr,
// SAFETY: headers backing buffer outlives the mio event loop ('static) // SAFETY: headers backing buffer outlives the mio event loop ('static)
inner: inner_req, inner: inner_req,
keep_alive, keep_alive,
upgrade,
te_chunked, te_chunked,
remaining_chunk_size: None, remaining_chunk_size: None,
content_read: 0, content_read: 0,
@ -1295,7 +1255,8 @@ where
_addr: addr, _addr: addr,
tx, tx,
rx, rx,
response: HashMap::with_capacity(1000), requests: HashMap::with_capacity(1000),
next_token: 0,
close_tx, close_tx,
listening_rx: Some(listening_rx), listening_rx: Some(listening_rx),
cancel_handle: CancelHandle::new_rc(), cancel_handle: CancelHandle::new_rc(),
@ -1371,18 +1332,14 @@ async fn op_flash_next_async(
// is responsible for ensuring this is not called concurrently. // is responsible for ensuring this is not called concurrently.
let ctx = unsafe { &mut *ctx }; let ctx = unsafe { &mut *ctx };
let cancel_handle = &ctx.cancel_handle; let cancel_handle = &ctx.cancel_handle;
let mut tokens = 0;
while let Ok(token) = ctx.rx.try_recv() { if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await {
ctx.response.insert(tokens, token); ctx.requests.insert(ctx.next_token, req);
tokens += 1; ctx.next_token += 1;
return 1;
} }
if tokens == 0 {
if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await { 0
ctx.response.insert(tokens, req);
tokens += 1;
}
}
tokens
} }
// Syncrhonous version of op_flash_next_async. Under heavy load, // Syncrhonous version of op_flash_next_async. Under heavy load,
@ -1455,7 +1412,7 @@ pub fn detach_socket(
// * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we // * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we
// use raw fds. // use raw fds.
let tx = ctx let tx = ctx
.response .requests
.remove(&token) .remove(&token)
.ok_or_else(|| type_error("request closed"))?; .ok_or_else(|| type_error("request closed"))?;
let stream = tx.socket(); let stream = tx.socket();

49
ext/flash/request.rs Normal file
View file

@ -0,0 +1,49 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use crate::Stream;
use std::pin::Pin;
#[derive(Debug)]
pub struct InnerRequest {
/// Backing buffer for the request.
pub buffer: Pin<Box<[u8]>>,
/// Owned headers, we have to keep it around since its referenced in `req`.
pub _headers: Vec<httparse::Header<'static>>,
/// Fully parsed request.
pub req: httparse::Request<'static, 'static>,
pub body_offset: usize,
pub body_len: usize,
}
#[derive(Debug)]
pub struct Request {
pub inner: InnerRequest,
// Pointer to stream owned by the server loop thread.
//
// Dereferencing is safe until server thread finishes and
// op_flash_serve resolves or websocket upgrade is performed.
pub socket: *mut Stream,
pub keep_alive: bool,
pub content_read: usize,
pub content_length: Option<u64>,
pub remaining_chunk_size: Option<usize>,
pub te_chunked: bool,
pub expect_continue: bool,
}
// SAFETY: Sent from server thread to JS thread.
// See comment above for `socket`.
unsafe impl Send for Request {}
impl Request {
#[inline(always)]
pub fn socket<'a>(&self) -> &'a mut Stream {
// SAFETY: Dereferencing is safe until server thread detaches socket or finishes.
unsafe { &mut *self.socket }
}
#[inline(always)]
pub fn method(&self) -> &str {
self.inner.req.method.unwrap()
}
}