1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 16:42:21 -05:00

fix(ext/http): ensure aborted bodies throw (#20503)

Fixes #20502 -- ensure that Hyper errors make it through to JS.
This commit is contained in:
Matt Mastracci 2023-09-15 08:08:21 -06:00 committed by GitHub
parent 06ece5645c
commit 71af3c375c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 13 deletions

View file

@ -1,6 +1,9 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { assertMatch } from "../../../test_util/std/testing/asserts.ts"; import {
assertMatch,
assertRejects,
} from "../../../test_util/std/testing/asserts.ts";
import { Buffer, BufReader, BufWriter } from "../../../test_util/std/io/mod.ts"; import { Buffer, BufReader, BufWriter } from "../../../test_util/std/io/mod.ts";
import { TextProtoReader } from "../testdata/run/textproto.ts"; import { TextProtoReader } from "../testdata/run/textproto.ts";
import { import {
@ -879,6 +882,43 @@ Deno.test(
}, },
); );
Deno.test(
{ permissions: { net: true } },
async function httpServerAbortedRequestBody() {
const promise = deferred();
const ac = new AbortController();
const listeningPromise = deferred();
const server = Deno.serve({
handler: async (request) => {
await assertRejects(async () => {
await request.text();
});
promise.resolve();
// Not actually used
return new Response();
},
port: servePort,
signal: ac.signal,
onListen: onListen(listeningPromise),
onError: createOnErrorCb(ac),
});
await listeningPromise;
const conn = await Deno.connect({ port: servePort });
// Send POST request with a body + content-length, but don't send it all
const encoder = new TextEncoder();
const body =
`POST / HTTP/1.1\r\nHost: 127.0.0.1:${servePort}\r\nContent-Length: 10\r\n\r\n12345`;
const writeResult = await conn.write(encoder.encode(body));
assertEquals(body.length, writeResult);
conn.close();
await promise;
ac.abort();
await server.finished;
},
);
function createStreamTest(count: number, delay: number, action: string) { function createStreamTest(count: number, delay: number, action: string) {
function doAction(controller: ReadableStreamDefaultController, i: number) { function doAction(controller: ReadableStreamDefaultController, i: number) {
if (i == count) { if (i == count) {

View file

@ -15,6 +15,8 @@ use hyper1::body::SizeHint;
use std::borrow::Cow; use std::borrow::Cow;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::task::ready;
use std::task::Poll;
/// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8. /// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8.
struct ReadFuture(Incoming); struct ReadFuture(Incoming);
@ -25,21 +27,26 @@ impl Stream for ReadFuture {
fn poll_next( fn poll_next(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> { ) -> Poll<Option<Self::Item>> {
let res = Pin::new(&mut self.get_mut().0).poll_frame(cx); // Loop until we receive a non-empty frame from Hyper
match res { let this = self.get_mut();
std::task::Poll::Ready(Some(Ok(frame))) => { loop {
let res = ready!(Pin::new(&mut this.0).poll_frame(cx));
break match res {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() { if let Ok(data) = frame.into_data() {
// Ensure that we never yield an empty frame // Ensure that we never yield an empty frame
if !data.is_empty() { if !data.is_empty() {
return std::task::Poll::Ready(Some(Ok(data))); break Poll::Ready(Some(Ok::<_, AnyError>(data)));
} }
} }
// Loop again so we don't lose the waker
continue;
} }
std::task::Poll::Ready(None) => return std::task::Poll::Ready(None), Some(Err(e)) => Poll::Ready(Some(Err(e.into()))),
_ => {} None => Poll::Ready(None),
};
} }
std::task::Poll::Pending
} }
} }