diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index f0a5b430be..f412a9071a 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -721,7 +721,7 @@ function createStreamTest(count: number, delay: number, action: string) { } for (const count of [0, 1, 2, 3]) { - for (const delay of [0, 1, 1000]) { + for (const delay of [0, 1, 25]) { // Creating a stream that errors in start will throw if (delay > 0) { createStreamTest(count, delay, "Throw"); @@ -1288,45 +1288,91 @@ Deno.test( }, ); +async function testDuplex( + reader: ReadableStreamDefaultReader, + writable: WritableStreamDefaultWriter, +) { + await writable.write(new Uint8Array([1])); + const chunk1 = await reader.read(); + assert(!chunk1.done); + assertEquals(chunk1.value, new Uint8Array([1])); + await writable.write(new Uint8Array([2])); + const chunk2 = await reader.read(); + assert(!chunk2.done); + assertEquals(chunk2.value, new Uint8Array([2])); + await writable.close(); + const chunk3 = await reader.read(); + assert(chunk3.done); +} + Deno.test( { permissions: { net: true } }, - async function httpServerStreamDuplex() { + async function httpServerStreamDuplexDirect() { const promise = deferred(); const ac = new AbortController(); const server = Deno.serve( { port: servePort, signal: ac.signal }, - (request) => { + (request: Request) => { assert(request.body); - promise.resolve(); return new Response(request.body); }, ); - const ts = new TransformStream(); - const writable = ts.writable.getWriter(); - + const { readable, writable } = new TransformStream(); const resp = await fetch(`http://127.0.0.1:${servePort}/`, { method: "POST", - body: ts.readable, + body: readable, }); await promise; assert(resp.body); - const reader = resp.body.getReader(); - await writable.write(new Uint8Array([1])); - const chunk1 = await reader.read(); - assert(!chunk1.done); - assertEquals(chunk1.value, new Uint8Array([1])); - await writable.write(new Uint8Array([2])); - const chunk2 = await reader.read(); - assert(!chunk2.done); - assertEquals(chunk2.value, new Uint8Array([2])); - await writable.close(); - const chunk3 = await reader.read(); - assert(chunk3.done); + await testDuplex(resp.body.getReader(), writable.getWriter()); + ac.abort(); + await server.finished; + }, +); +// Test that a duplex stream passing through JavaScript also works (ie: that the request body resource +// is still alive). https://github.com/denoland/deno/pull/20206 +Deno.test( + { permissions: { net: true } }, + async function httpServerStreamDuplexJavascript() { + const promise = deferred(); + const ac = new AbortController(); + + const server = Deno.serve( + { port: servePort, signal: ac.signal }, + (request: Request) => { + assert(request.body); + promise.resolve(); + const reader = request.body.getReader(); + return new Response( + new ReadableStream({ + async pull(controller) { + await new Promise((r) => setTimeout(r, 100)); + const { done, value } = await reader.read(); + if (done) { + controller.close(); + } else { + controller.enqueue(value); + } + }, + }), + ); + }, + ); + + const { readable, writable } = new TransformStream(); + const resp = await fetch(`http://127.0.0.1:${servePort}/`, { + method: "POST", + body: readable, + }); + + await promise; + assert(resp.body); + await testDuplex(resp.body.getReader(), writable.getWriter()); ac.abort(); await server.finished; }, diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 265b797066..7307ab2d80 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -133,10 +133,6 @@ class InnerRequest { } close() { - if (this.#streamRid !== undefined) { - core.close(this.#streamRid); - this.#streamRid = undefined; - } this.#slabId = undefined; } diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 60ef83b0f7..17e9befe22 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -14,6 +14,7 @@ use crate::slab::slab_drop; use crate::slab::slab_get; use crate::slab::slab_init; use crate::slab::slab_insert; +use crate::slab::HttpRequestBodyAutocloser; use crate::slab::SlabId; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; @@ -376,13 +377,20 @@ pub fn op_http_get_request_headers<'scope>( #[op(fast)] pub fn op_http_read_request_body( - state: &mut OpState, + state: Rc>, slab_id: SlabId, ) -> ResourceId { let mut http = slab_get(slab_id); - let incoming = http.take_body(); - let body_resource = Rc::new(HttpRequestBody::new(incoming)); - state.resource_table.add_rc(body_resource) + let rid = if let Some(incoming) = http.take_body() { + let body_resource = Rc::new(HttpRequestBody::new(incoming)); + state.borrow_mut().resource_table.add_rc(body_resource) + } else { + // This should not be possible, but rather than panicking we'll return an invalid + // resource value to JavaScript. + ResourceId::MAX + }; + http.put_resource(HttpRequestBodyAutocloser::new(rid, state.clone())); + rid } #[op2(fast)] @@ -577,6 +585,7 @@ fn set_response( response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { let mut http = slab_get(slab_id); + let resource = http.take_resource(); let compression = is_request_compressible(&http.request_parts().headers); let response = http.response(); let compression = modify_compressibility_from_response( @@ -584,7 +593,9 @@ fn set_response( length, response.headers_mut(), ); - response.body_mut().initialize(response_fn(compression)); + response + .body_mut() + .initialize(response_fn(compression), resource); // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we // will quitely ignore invalid values. diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index bd9d6f4332..f88f13f889 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -23,6 +23,8 @@ use hyper1::body::Frame; use hyper1::body::SizeHint; use pin_project::pin_project; +use crate::slab::HttpRequestBodyAutocloser; + /// Simplification for nested types we use for our streams. We provide a way to convert from /// this type into Hyper's body [`Frame`]. enum ResponseStreamResult { @@ -156,34 +158,40 @@ impl std::fmt::Debug for ResponseBytesInner { /// This represents the union of possible response types in Deno with the stream-style [`Body`] interface /// required by hyper. As the API requires information about request completion (including a success/fail /// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on. -#[derive(Debug, Default)] -pub struct ResponseBytes( - ResponseBytesInner, - CompletionHandle, - Rc>>, -); +#[derive(Default)] +pub struct ResponseBytes { + inner: ResponseBytesInner, + completion_handle: CompletionHandle, + headers: Rc>>, + res: Option, +} impl ResponseBytes { - pub fn initialize(&mut self, inner: ResponseBytesInner) { - debug_assert!(matches!(self.0, ResponseBytesInner::Empty)); - self.0 = inner; + pub fn initialize( + &mut self, + inner: ResponseBytesInner, + req_body_resource: Option, + ) { + debug_assert!(matches!(self.inner, ResponseBytesInner::Empty)); + self.inner = inner; + self.res = req_body_resource; } pub fn completion_handle(&self) -> CompletionHandle { - self.1.clone() + self.completion_handle.clone() } pub fn trailers(&self) -> Rc>> { - self.2.clone() + self.headers.clone() } fn complete(&mut self, success: bool) -> ResponseBytesInner { - if matches!(self.0, ResponseBytesInner::Done) { + if matches!(self.inner, ResponseBytesInner::Done) { return ResponseBytesInner::Done; } - let current = std::mem::replace(&mut self.0, ResponseBytesInner::Done); - self.1.complete(success); + let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done); + self.completion_handle.complete(success); current } } @@ -274,9 +282,9 @@ impl Body for ResponseBytes { cx: &mut std::task::Context<'_>, ) -> std::task::Poll, Self::Error>>> { let res = loop { - let res = match &mut self.0 { + let res = match &mut self.inner { ResponseBytesInner::Done | ResponseBytesInner::Empty => { - if let Some(trailers) = self.2.borrow_mut().take() { + if let Some(trailers) = self.headers.borrow_mut().take() { return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers)))); } unreachable!() @@ -303,7 +311,7 @@ impl Body for ResponseBytes { }; if matches!(res, ResponseStreamResult::EndOfStream) { - if let Some(trailers) = self.2.borrow_mut().take() { + if let Some(trailers) = self.headers.borrow_mut().take() { return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers)))); } self.complete(true); @@ -312,21 +320,23 @@ impl Body for ResponseBytes { } fn is_end_stream(&self) -> bool { - matches!(self.0, ResponseBytesInner::Done | ResponseBytesInner::Empty) - && self.2.borrow_mut().is_none() + matches!( + self.inner, + ResponseBytesInner::Done | ResponseBytesInner::Empty + ) && self.headers.borrow_mut().is_none() } fn size_hint(&self) -> SizeHint { // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through // anyways just in case hyper needs it. - self.0.size_hint() + self.inner.size_hint() } } impl Drop for ResponseBytes { fn drop(&mut self) { // We won't actually poll_frame for Empty responses so this is where we return success - self.complete(matches!(self.0, ResponseBytesInner::Empty)); + self.complete(matches!(self.inner, ResponseBytesInner::Empty)); } } diff --git a/ext/http/slab.rs b/ext/http/slab.rs index dbe1a66351..8c285c8606 100644 --- a/ext/http/slab.rs +++ b/ext/http/slab.rs @@ -3,6 +3,8 @@ use crate::request_properties::HttpConnectionProperties; use crate::response_body::CompletionHandle; use crate::response_body::ResponseBytes; use deno_core::error::AnyError; +use deno_core::OpState; +use deno_core::ResourceId; use http::request::Parts; use http::HeaderMap; use hyper1::body::Incoming; @@ -18,10 +20,36 @@ pub type Request = hyper1::Request; pub type Response = hyper1::Response; pub type SlabId = u32; +enum RequestBodyState { + Incoming(Incoming), + Resource(HttpRequestBodyAutocloser), +} + +impl From for RequestBodyState { + fn from(value: Incoming) -> Self { + RequestBodyState::Incoming(value) + } +} + +/// Ensures that the request body closes itself when no longer needed. +pub struct HttpRequestBodyAutocloser(ResourceId, Rc>); + +impl HttpRequestBodyAutocloser { + pub fn new(res: ResourceId, op_state: Rc>) -> Self { + Self(res, op_state) + } +} + +impl Drop for HttpRequestBodyAutocloser { + fn drop(&mut self) { + _ = self.1.borrow_mut().resource_table.close(self.0); + } +} + pub struct HttpSlabRecord { request_info: HttpConnectionProperties, request_parts: Parts, - request_body: Option, + request_body: Option, // The response may get taken before we tear this down response: Option, promise: CompletionHandle, @@ -98,6 +126,7 @@ fn slab_insert_raw( let mut slab = slab.borrow_mut(); let body = ResponseBytes::default(); let trailers = body.trailers(); + let request_body = request_body.map(|r| r.into()); slab.insert(HttpSlabRecord { request_info, request_parts, @@ -174,8 +203,35 @@ impl SlabEntry { } /// Take the Hyper body from this entry. - pub fn take_body(&mut self) -> Incoming { - self.self_mut().request_body.take().unwrap() + pub fn take_body(&mut self) -> Option { + let body_holder = &mut self.self_mut().request_body; + let body = body_holder.take(); + match body { + Some(RequestBodyState::Incoming(body)) => Some(body), + x => { + *body_holder = x; + None + } + } + } + + pub fn take_resource(&mut self) -> Option { + let body_holder = &mut self.self_mut().request_body; + let body = body_holder.take(); + match body { + Some(RequestBodyState::Resource(res)) => Some(res), + x => { + *body_holder = x; + None + } + } + } + + /// Replace the request body with a resource ID and the OpState we'll need to shut it down. + /// We cannot keep just the resource itself, as JS code might be reading from the resource ID + /// to generate the response data (requiring us to keep it in the resource table). + pub fn put_resource(&mut self, res: HttpRequestBodyAutocloser) { + self.self_mut().request_body = Some(RequestBodyState::Resource(res)); } /// Complete this entry, potentially expunging it if it is complete.