diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 5262c5919d..f8de633830 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -751,3 +751,67 @@ unitTest({ perms: { net: true } }, async function httpServerPanic() { client.close(); listener.close(); }); + +// https://github.com/denoland/deno/issues/11595 +unitTest( + { perms: { net: true } }, + async function httpServerIncompleteMessage() { + const listener = Deno.listen({ port: 4501 }); + const def1 = deferred(); + const def2 = deferred(); + + const client = await Deno.connect({ port: 4501 }); + await client.write(new TextEncoder().encode( + `GET / HTTP/1.0\r\n\r\n`, + )); + + const conn = await listener.accept(); + const httpConn = Deno.serveHttp(conn); + const ev = await httpConn.nextRequest(); + const { respondWith } = ev!; + + const { readable, writable } = new TransformStream(); + const writer = writable.getWriter(); + + async function writeResponse() { + await writer.write( + new TextEncoder().encode( + "written to the writable side of a TransformStream", + ), + ); + await writer.close(); + } + + const errors: Error[] = []; + + writeResponse() + .catch((error: Error) => { + errors.push(error); + }) + .then(() => def1.resolve()); + + const res = new Response(readable); + + respondWith(res) + .catch((error: Error) => errors.push(error)) + .then(() => def2.resolve()); + + client.close(); + + await Promise.all([ + def1, + def2, + ]); + + listener.close(); + + assertEquals(errors.length, 2); + for (const error of errors) { + assertEquals(error.name, "Http"); + assertEquals( + error.message, + "connection closed before message completed", + ); + } + }, +); diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 8e91423457..8221a6b0d8 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -436,17 +436,33 @@ async fn op_http_response( // The only failure mode is the receiver already having dropped its end // of the channel. if response_sender.sender.send(res).is_err() { + if let Some(rid) = maybe_response_body_rid { + let _ = state + .borrow_mut() + .resource_table + .take::(rid); + } return Err(type_error("internal communication error")); } - poll_fn(|cx| match conn_resource.poll(cx) { + let result = poll_fn(|cx| match conn_resource.poll(cx) { Poll::Ready(x) => { state.borrow_mut().resource_table.close(conn_rid).ok(); Poll::Ready(x) } Poll::Pending => Poll::Ready(Ok(())), }) - .await?; + .await; + + if let Err(e) = result { + if let Some(rid) = maybe_response_body_rid { + let _ = state + .borrow_mut() + .resource_table + .take::(rid); + } + return Err(e); + } if maybe_response_body_rid.is_none() { conn_resource.deno_service.waker.wake();