diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 2e7836b6a9..830817146a 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -2699,12 +2699,14 @@ Deno.test( for (const url of ["text", "file", "stream"]) { // Ensure that we don't panic when the incoming TCP request was dropped - // https://github.com/denoland/deno/issues/20315 + // https://github.com/denoland/deno/issues/20315 and that we correctly + // close/cancel the response Deno.test({ permissions: { read: true, write: true, net: true }, name: `httpServerTcpCancellation_${url}`, fn: async function () { const ac = new AbortController(); + const streamCancelled = url == "stream" ? deferred() : undefined; const listeningPromise = deferred(); const waitForAbort = deferred(); const waitForRequest = deferred(); @@ -2727,7 +2729,9 @@ for (const url of ["text", "file", "stream"]) { start(controller) { _body = null; controller.enqueue(new Uint8Array([1])); - controller.close(); + }, + cancel(reason) { + streamCancelled!.resolve(reason); }, }), ); @@ -2753,14 +2757,56 @@ for (const url of ["text", "file", "stream"]) { // Give it a few milliseconds for the serve machinery to work await new Promise((r) => setTimeout(r, 10)); + // Wait for cancellation before we shut the server down + if (streamCancelled !== undefined) { + await streamCancelled; + } + // Since the handler has a chance of creating resources or running async ops, we need to use a // graceful shutdown here to ensure they have fully drained. await server.shutdown(); + await server.finished; }, }); } +Deno.test( + { permissions: { net: true } }, + async function httpServerCancelFetch() { + const request2 = deferred(); + const request2Aborted = deferred(); + const { finished, abort } = await makeServer(async (req) => { + if (req.url.endsWith("/1")) { + const fetchRecursive = await fetch(`http://localhost:${servePort}/2`); + return new Response(fetchRecursive.body); + } else if (req.url.endsWith("/2")) { + request2.resolve(); + return new Response( + new ReadableStream({ + start(_controller) {/* just hang */}, + cancel(reason) { + request2Aborted.resolve(reason); + }, + }), + ); + } + fail(); + }); + const fetchAbort = new AbortController(); + const fetchPromise = await fetch(`http://localhost:${servePort}/1`, { + signal: fetchAbort.signal, + }); + await fetchPromise; + await request2; + fetchAbort.abort(); + assertEquals("resource closed", await request2Aborted); + + abort(); + await finished; + }, +); + Deno.test( { permissions: { read: true, net: true } }, async function httpServerWithTls() { diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 21e138f860..7ccd9ec816 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -577,10 +577,13 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) { ); } +/// Sets the appropriate response body. Use `force_instantiate_body` if you need +/// to ensure that the response is cleaned up correctly (eg: for resources). fn set_response( slab_id: SlabId, length: Option, status: u16, + force_instantiate_body: bool, response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { let mut http = slab_get(slab_id); @@ -602,7 +605,10 @@ fn set_response( if let Ok(code) = StatusCode::from_u16(status) { *response.status_mut() = code; } + } else if force_instantiate_body { + response_fn(Compression::None).abort(); } + http.complete(); } @@ -634,6 +640,7 @@ pub fn op_http_set_response_body_resource( slab_id, resource.size_hint().1.map(|s| s as usize), status, + true, move |compression| { ResponseBytesInner::from_resource(compression, resource, auto_close) }, @@ -649,7 +656,7 @@ pub fn op_http_set_response_body_text( status: u16, ) { if !text.is_empty() { - set_response(slab_id, Some(text.len()), status, |compression| { + set_response(slab_id, Some(text.len()), status, false, |compression| { ResponseBytesInner::from_vec(compression, text.into_bytes()) }); } else { @@ -665,7 +672,7 @@ pub fn op_http_set_response_body_bytes( status: u16, ) { if !buffer.is_empty() { - set_response(slab_id, Some(buffer.len()), status, |compression| { + set_response(slab_id, Some(buffer.len()), status, false, |compression| { ResponseBytesInner::from_bufview(compression, BufView::from(buffer)) }); } else { diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 5c946a4d3c..4f7e3b0a5d 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -125,6 +125,16 @@ pub enum ResponseStream { TestChannel(tokio::sync::mpsc::Receiver), } +impl ResponseStream { + pub fn abort(self) { + match self { + ResponseStream::Resource(resource) => resource.stm.close(), + #[cfg(test)] + ResponseStream::TestChannel(..) => {} + } + } +} + #[derive(Default)] pub enum ResponseBytesInner { /// An empty stream. @@ -192,11 +202,25 @@ impl ResponseBytes { let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done); self.completion_handle.complete(success); - current + if success { + current + } else { + current.abort(); + ResponseBytesInner::Done + } } } impl ResponseBytesInner { + pub fn abort(self) { + match self { + Self::Done | Self::Empty | Self::Bytes(..) => {} + Self::BrotliStream(stm) => stm.abort(), + Self::GZipStream(stm) => stm.abort(), + Self::UncompressedStream(stm) => stm.abort(), + } + } + pub fn size_hint(&self) -> SizeHint { match self { Self::Done => SizeHint::with_exact(0), @@ -463,6 +487,10 @@ impl GZipResponseStream { underlying, } } + + pub fn abort(self) { + self.underlying.abort() + } } /// This is a minimal GZip header suitable for serving data from a webserver. We don't need to provide @@ -645,6 +673,10 @@ impl BrotliResponseStream { underlying, } } + + pub fn abort(self) { + self.underlying.abort() + } } fn max_compressed_size(input_size: usize) -> usize { diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs index e19954fdca..b35d4c302d 100644 --- a/ext/web/stream_resource.rs +++ b/ext/web/stream_resource.rs @@ -364,6 +364,15 @@ impl ReadableStreamResource { .read(limit) .map(|buf| buf.unwrap_or_else(BufView::empty)) } + + fn close_channel(&self) { + // Trigger the promise in JS to cancel the stream if necessarily + self.data.completion.complete(true); + // Cancel any outstanding read requests + self.cancel_handle.cancel(); + // Close the channel to wake up anyone waiting + self.channel.close(); + } } impl Resource for ReadableStreamResource { @@ -376,8 +385,13 @@ impl Resource for ReadableStreamResource { } fn close(self: Rc) { - self.cancel_handle.cancel(); - self.channel.close(); + self.close_channel(); + } +} + +impl Drop for ReadableStreamResource { + fn drop(&mut self) { + self.close_channel(); } }