diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 4a362a4796..d84a4bdd28 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -7,6 +7,7 @@ import { assert, assertEquals, assertThrowsAsync, + deferred, unitTest, } from "./test_util.ts"; @@ -450,3 +451,93 @@ unitTest( } }, ); + +unitTest( + { perms: { net: true } }, + // Issue: https://github.com/denoland/deno/issues/10930 + async function httpServerStreamingResponse() { + // This test enqueues a single chunk for readable + // stream and waits for client to read that chunk and signal + // it before enqueueing subsequent chunk. Issue linked above + // presented a situation where enqueued chunks were not + // written to the HTTP connection until the next chunk was enqueued. + + let counter = 0; + + const deferreds = [ + deferred(), + deferred(), + deferred(), + ]; + + async function writeRequest(conn: Deno.Conn) { + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + + const w = new BufWriter(conn); + const r = new BufReader(conn); + const body = `GET / HTTP/1.1\r\nHost: 127.0.0.1:4501\r\n\r\n`; + const writeResult = await w.write(encoder.encode(body)); + assertEquals(body.length, writeResult); + await w.flush(); + const tpr = new TextProtoReader(r); + const statusLine = await tpr.readLine(); + assert(statusLine !== null); + const headers = await tpr.readMIMEHeader(); + assert(headers !== null); + + const chunkedReader = chunkedBodyReader(headers, r); + const buf = new Uint8Array(5); + const dest = new Buffer(); + let result: number | null; + while ((result = await chunkedReader.read(buf)) !== null) { + const len = Math.min(buf.byteLength, result); + await dest.write(buf.subarray(0, len)); + // Resolve a deferred - this will make response stream to + // enqueue next chunk. + deferreds[counter - 1].resolve(); + } + return decoder.decode(dest.bytes()); + } + + function periodicStream() { + return new ReadableStream({ + start(controller) { + controller.enqueue(`${counter}\n`); + counter++; + }, + + async pull(controller) { + if (counter >= 3) { + return controller.close(); + } + + await deferreds[counter - 1]; + + controller.enqueue(`${counter}\n`); + counter++; + }, + }).pipeThrough(new TextEncoderStream()); + } + + const listener = Deno.listen({ port: 4501 }); + const finished = (async () => { + const conn = await listener.accept(); + const httpConn = Deno.serveHttp(conn); + const requestEvent = await httpConn.nextRequest(); + const { respondWith } = requestEvent!; + await respondWith(new Response(periodicStream())); + httpConn.close(); + })(); + + // start a client + const clientConn = await Deno.connect({ port: 4501 }); + + const r1 = await writeRequest(clientConn); + assertEquals(r1, "0\n1\n2\n"); + + await finished; + clientConn.close(); + listener.close(); + }, +); diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs index 11e83f6c77..3e8a4ada78 100644 --- a/runtime/ops/http.rs +++ b/runtime/ops/http.rs @@ -502,6 +502,9 @@ async fn op_http_response_write( let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local(); poll_fn(|cx| { + let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from); + + // Poll connection so the data is flushed if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { // close ConnResource // close RequestResource associated with connection @@ -509,7 +512,7 @@ async fn op_http_response_write( return Poll::Ready(Err(e)); } - send_data_fut.poll_unpin(cx).map_err(AnyError::from) + r }) .await?;