diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 9560394f04..df599c6f4f 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -339,3 +339,37 @@ unitTest( await promise; }, ); + +unitTest( + { perms: { net: true } }, + async function httpServerNextRequestResolvesOnClose() { + const delay = (n: number) => + new Promise((resolve) => setTimeout(resolve, n)); + const httpConnList: Deno.HttpConn[] = []; + + async function serve(l: Deno.Listener) { + for await (const conn of l) { + (async () => { + const c = Deno.serveHttp(conn); + httpConnList.push(c); + for await (const { respondWith } of c) { + respondWith(new Response("hello")); + } + })(); + } + } + + const l = Deno.listen({ port: 4500 }); + serve(l); + + await delay(300); + const res = await fetch("http://localhost:4500/"); + const _text = await res.text(); + + // Close connection and listener. + httpConnList.forEach((conn) => conn.close()); + l.close(); + + await delay(300); + }, +); diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs index e4ba2db2a5..fedcb404f5 100644 --- a/runtime/ops/http.rs +++ b/runtime/ops/http.rs @@ -14,7 +14,6 @@ use deno_core::futures::StreamExt; use deno_core::op_async; use deno_core::op_sync; use deno_core::AsyncRefCell; -use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::Extension; @@ -107,6 +106,7 @@ struct ConnResource { hyper_connection: ConnType, deno_service: Service, addr: SocketAddr, + cancel: CancelHandle, } impl ConnResource { @@ -124,6 +124,10 @@ impl Resource for ConnResource { fn name(&self) -> Cow { "httpConnection".into() } + + fn close(self: Rc) { + self.cancel.cancel() + } } // We use a tuple instead of struct to avoid serialization overhead of the keys. @@ -153,6 +157,8 @@ async fn op_http_request_next( .get::(conn_rid) .ok_or_else(bad_resource_id)?; + let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); + poll_fn(|cx| { let connection_closed = match conn_resource.poll(cx) { Poll::Pending => false, @@ -257,6 +263,7 @@ async fn op_http_request_next( Poll::Pending } }) + .try_or_cancel(cancel) .await .map_err(AnyError::from) } @@ -298,6 +305,7 @@ fn op_http_start( hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))), deno_service, addr, + cancel: CancelHandle::default(), }; let rid = state.resource_table.add(conn_resource); return Ok(rid); @@ -320,6 +328,7 @@ fn op_http_start( hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))), deno_service, addr, + cancel: CancelHandle::default(), }; let rid = state.resource_table.add(conn_resource); return Ok(rid); @@ -381,7 +390,6 @@ async fn op_http_response( let response_body_rid = state.borrow_mut().resource_table.add(ResponseBodyResource { body: AsyncRefCell::new(sender), - cancel: CancelHandle::default(), conn_rid: response_sender.conn_rid, }); @@ -484,12 +492,8 @@ async fn op_http_response_write( .ok_or_else(bad_resource_id)?; let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - let mut send_data_fut = body - .send_data(Vec::from(&*buf).into()) - .or_cancel(cancel) - .boxed_local(); + let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local(); poll_fn(|cx| { if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { @@ -501,8 +505,7 @@ async fn op_http_response_write( send_data_fut.poll_unpin(cx).map_err(AnyError::from) }) - .await? - .unwrap(); // panic on send_data error + .await?; Ok(()) } @@ -520,6 +523,10 @@ impl Resource for RequestBodyResource { fn name(&self) -> Cow { "requestBody".into() } + + fn close(self: Rc) { + self.cancel.cancel() + } } struct ResponseSenderResource { @@ -535,7 +542,6 @@ impl Resource for ResponseSenderResource { struct ResponseBodyResource { body: AsyncRefCell, - cancel: CancelHandle, conn_rid: ResourceId, }