mirror of
https://github.com/denoland/deno.git
synced 2025-01-12 00:54:02 -05:00
fix(ext/http): ensure request body resource lives as long as response is alive (#20206)
Deno.serve's fast streaming implementation was not keeping the request
body resource ID alive. We were taking the `Rc<Resource>` from the
resource table during the response, so a hairpin duplex response that
fed back the request body would work.
However, if any JS code attempted to read from the request body (which
requires the resource ID to be valid), the response would fail with a
difficult-to-diagnose "EOF" error.
This was affecting more complex duplex uses of `Deno.fetch` (though as
far as I can tell was unreported).
Simple test:
```ts
const reader = request.body.getReader();
return new Response(
new ReadableStream({
async pull(controller) {
const { done, value } = await reader.read();
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
}),
```
And then attempt to use the stream in duplex mode:
```ts
async function testDuplex(
reader: ReadableStreamDefaultReader<Uint8Array>,
writable: WritableStreamDefaultWriter<Uint8Array>,
) {
await writable.write(new Uint8Array([1]));
const chunk1 = await reader.read();
assert(!chunk1.done);
assertEquals(chunk1.value, new Uint8Array([1]));
await writable.write(new Uint8Array([2]));
const chunk2 = await reader.read();
assert(!chunk2.done);
assertEquals(chunk2.value, new Uint8Array([2]));
await writable.close();
const chunk3 = await reader.read();
assert(chunk3.done);
}
```
In older versions of Deno, this would just lock up. I believe after
23ff0e722e
, it started throwing a more
explicit error:
```
httpServerStreamDuplexJavascript => ./cli/tests/unit/serve_test.ts:1339:6
error: TypeError: request or response body error: error reading a body from connection: Connection reset by peer (os error 54)
at async Object.pull (ext:deno_web/06_streams.js:810:27)
```
This commit is contained in:
parent
efdf0bbd9b
commit
576d0db372
5 changed files with 172 additions and 53 deletions
|
@ -721,7 +721,7 @@ function createStreamTest(count: number, delay: number, action: string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const count of [0, 1, 2, 3]) {
|
for (const count of [0, 1, 2, 3]) {
|
||||||
for (const delay of [0, 1, 1000]) {
|
for (const delay of [0, 1, 25]) {
|
||||||
// Creating a stream that errors in start will throw
|
// Creating a stream that errors in start will throw
|
||||||
if (delay > 0) {
|
if (delay > 0) {
|
||||||
createStreamTest(count, delay, "Throw");
|
createStreamTest(count, delay, "Throw");
|
||||||
|
@ -1288,33 +1288,10 @@ Deno.test(
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
Deno.test(
|
async function testDuplex(
|
||||||
{ permissions: { net: true } },
|
reader: ReadableStreamDefaultReader<Uint8Array>,
|
||||||
async function httpServerStreamDuplex() {
|
writable: WritableStreamDefaultWriter<Uint8Array>,
|
||||||
const promise = deferred();
|
) {
|
||||||
const ac = new AbortController();
|
|
||||||
|
|
||||||
const server = Deno.serve(
|
|
||||||
{ port: servePort, signal: ac.signal },
|
|
||||||
(request) => {
|
|
||||||
assert(request.body);
|
|
||||||
|
|
||||||
promise.resolve();
|
|
||||||
return new Response(request.body);
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
const ts = new TransformStream();
|
|
||||||
const writable = ts.writable.getWriter();
|
|
||||||
|
|
||||||
const resp = await fetch(`http://127.0.0.1:${servePort}/`, {
|
|
||||||
method: "POST",
|
|
||||||
body: ts.readable,
|
|
||||||
});
|
|
||||||
|
|
||||||
await promise;
|
|
||||||
assert(resp.body);
|
|
||||||
const reader = resp.body.getReader();
|
|
||||||
await writable.write(new Uint8Array([1]));
|
await writable.write(new Uint8Array([1]));
|
||||||
const chunk1 = await reader.read();
|
const chunk1 = await reader.read();
|
||||||
assert(!chunk1.done);
|
assert(!chunk1.done);
|
||||||
|
@ -1326,7 +1303,76 @@ Deno.test(
|
||||||
await writable.close();
|
await writable.close();
|
||||||
const chunk3 = await reader.read();
|
const chunk3 = await reader.read();
|
||||||
assert(chunk3.done);
|
assert(chunk3.done);
|
||||||
|
}
|
||||||
|
|
||||||
|
Deno.test(
|
||||||
|
{ permissions: { net: true } },
|
||||||
|
async function httpServerStreamDuplexDirect() {
|
||||||
|
const promise = deferred();
|
||||||
|
const ac = new AbortController();
|
||||||
|
|
||||||
|
const server = Deno.serve(
|
||||||
|
{ port: servePort, signal: ac.signal },
|
||||||
|
(request: Request) => {
|
||||||
|
assert(request.body);
|
||||||
|
promise.resolve();
|
||||||
|
return new Response(request.body);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
const { readable, writable } = new TransformStream();
|
||||||
|
const resp = await fetch(`http://127.0.0.1:${servePort}/`, {
|
||||||
|
method: "POST",
|
||||||
|
body: readable,
|
||||||
|
});
|
||||||
|
|
||||||
|
await promise;
|
||||||
|
assert(resp.body);
|
||||||
|
await testDuplex(resp.body.getReader(), writable.getWriter());
|
||||||
|
ac.abort();
|
||||||
|
await server.finished;
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
// Test that a duplex stream passing through JavaScript also works (ie: that the request body resource
|
||||||
|
// is still alive). https://github.com/denoland/deno/pull/20206
|
||||||
|
Deno.test(
|
||||||
|
{ permissions: { net: true } },
|
||||||
|
async function httpServerStreamDuplexJavascript() {
|
||||||
|
const promise = deferred();
|
||||||
|
const ac = new AbortController();
|
||||||
|
|
||||||
|
const server = Deno.serve(
|
||||||
|
{ port: servePort, signal: ac.signal },
|
||||||
|
(request: Request) => {
|
||||||
|
assert(request.body);
|
||||||
|
promise.resolve();
|
||||||
|
const reader = request.body.getReader();
|
||||||
|
return new Response(
|
||||||
|
new ReadableStream({
|
||||||
|
async pull(controller) {
|
||||||
|
await new Promise((r) => setTimeout(r, 100));
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) {
|
||||||
|
controller.close();
|
||||||
|
} else {
|
||||||
|
controller.enqueue(value);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
const { readable, writable } = new TransformStream();
|
||||||
|
const resp = await fetch(`http://127.0.0.1:${servePort}/`, {
|
||||||
|
method: "POST",
|
||||||
|
body: readable,
|
||||||
|
});
|
||||||
|
|
||||||
|
await promise;
|
||||||
|
assert(resp.body);
|
||||||
|
await testDuplex(resp.body.getReader(), writable.getWriter());
|
||||||
ac.abort();
|
ac.abort();
|
||||||
await server.finished;
|
await server.finished;
|
||||||
},
|
},
|
||||||
|
|
|
@ -133,10 +133,6 @@ class InnerRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
close() {
|
close() {
|
||||||
if (this.#streamRid !== undefined) {
|
|
||||||
core.close(this.#streamRid);
|
|
||||||
this.#streamRid = undefined;
|
|
||||||
}
|
|
||||||
this.#slabId = undefined;
|
this.#slabId = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ use crate::slab::slab_drop;
|
||||||
use crate::slab::slab_get;
|
use crate::slab::slab_get;
|
||||||
use crate::slab::slab_init;
|
use crate::slab::slab_init;
|
||||||
use crate::slab::slab_insert;
|
use crate::slab::slab_insert;
|
||||||
|
use crate::slab::HttpRequestBodyAutocloser;
|
||||||
use crate::slab::SlabId;
|
use crate::slab::SlabId;
|
||||||
use crate::websocket_upgrade::WebSocketUpgrade;
|
use crate::websocket_upgrade::WebSocketUpgrade;
|
||||||
use crate::LocalExecutor;
|
use crate::LocalExecutor;
|
||||||
|
@ -376,13 +377,20 @@ pub fn op_http_get_request_headers<'scope>(
|
||||||
|
|
||||||
#[op(fast)]
|
#[op(fast)]
|
||||||
pub fn op_http_read_request_body(
|
pub fn op_http_read_request_body(
|
||||||
state: &mut OpState,
|
state: Rc<RefCell<OpState>>,
|
||||||
slab_id: SlabId,
|
slab_id: SlabId,
|
||||||
) -> ResourceId {
|
) -> ResourceId {
|
||||||
let mut http = slab_get(slab_id);
|
let mut http = slab_get(slab_id);
|
||||||
let incoming = http.take_body();
|
let rid = if let Some(incoming) = http.take_body() {
|
||||||
let body_resource = Rc::new(HttpRequestBody::new(incoming));
|
let body_resource = Rc::new(HttpRequestBody::new(incoming));
|
||||||
state.resource_table.add_rc(body_resource)
|
state.borrow_mut().resource_table.add_rc(body_resource)
|
||||||
|
} else {
|
||||||
|
// This should not be possible, but rather than panicking we'll return an invalid
|
||||||
|
// resource value to JavaScript.
|
||||||
|
ResourceId::MAX
|
||||||
|
};
|
||||||
|
http.put_resource(HttpRequestBodyAutocloser::new(rid, state.clone()));
|
||||||
|
rid
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op2(fast)]
|
#[op2(fast)]
|
||||||
|
@ -577,6 +585,7 @@ fn set_response(
|
||||||
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
|
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
|
||||||
) {
|
) {
|
||||||
let mut http = slab_get(slab_id);
|
let mut http = slab_get(slab_id);
|
||||||
|
let resource = http.take_resource();
|
||||||
let compression = is_request_compressible(&http.request_parts().headers);
|
let compression = is_request_compressible(&http.request_parts().headers);
|
||||||
let response = http.response();
|
let response = http.response();
|
||||||
let compression = modify_compressibility_from_response(
|
let compression = modify_compressibility_from_response(
|
||||||
|
@ -584,7 +593,9 @@ fn set_response(
|
||||||
length,
|
length,
|
||||||
response.headers_mut(),
|
response.headers_mut(),
|
||||||
);
|
);
|
||||||
response.body_mut().initialize(response_fn(compression));
|
response
|
||||||
|
.body_mut()
|
||||||
|
.initialize(response_fn(compression), resource);
|
||||||
|
|
||||||
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
|
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
|
||||||
// will quitely ignore invalid values.
|
// will quitely ignore invalid values.
|
||||||
|
|
|
@ -23,6 +23,8 @@ use hyper1::body::Frame;
|
||||||
use hyper1::body::SizeHint;
|
use hyper1::body::SizeHint;
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
|
|
||||||
|
use crate::slab::HttpRequestBodyAutocloser;
|
||||||
|
|
||||||
/// Simplification for nested types we use for our streams. We provide a way to convert from
|
/// Simplification for nested types we use for our streams. We provide a way to convert from
|
||||||
/// this type into Hyper's body [`Frame`].
|
/// this type into Hyper's body [`Frame`].
|
||||||
enum ResponseStreamResult {
|
enum ResponseStreamResult {
|
||||||
|
@ -156,34 +158,40 @@ impl std::fmt::Debug for ResponseBytesInner {
|
||||||
/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface
|
/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface
|
||||||
/// required by hyper. As the API requires information about request completion (including a success/fail
|
/// required by hyper. As the API requires information about request completion (including a success/fail
|
||||||
/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on.
|
/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on.
|
||||||
#[derive(Debug, Default)]
|
#[derive(Default)]
|
||||||
pub struct ResponseBytes(
|
pub struct ResponseBytes {
|
||||||
ResponseBytesInner,
|
inner: ResponseBytesInner,
|
||||||
CompletionHandle,
|
completion_handle: CompletionHandle,
|
||||||
Rc<RefCell<Option<HeaderMap>>>,
|
headers: Rc<RefCell<Option<HeaderMap>>>,
|
||||||
);
|
res: Option<HttpRequestBodyAutocloser>,
|
||||||
|
}
|
||||||
|
|
||||||
impl ResponseBytes {
|
impl ResponseBytes {
|
||||||
pub fn initialize(&mut self, inner: ResponseBytesInner) {
|
pub fn initialize(
|
||||||
debug_assert!(matches!(self.0, ResponseBytesInner::Empty));
|
&mut self,
|
||||||
self.0 = inner;
|
inner: ResponseBytesInner,
|
||||||
|
req_body_resource: Option<HttpRequestBodyAutocloser>,
|
||||||
|
) {
|
||||||
|
debug_assert!(matches!(self.inner, ResponseBytesInner::Empty));
|
||||||
|
self.inner = inner;
|
||||||
|
self.res = req_body_resource;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn completion_handle(&self) -> CompletionHandle {
|
pub fn completion_handle(&self) -> CompletionHandle {
|
||||||
self.1.clone()
|
self.completion_handle.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn trailers(&self) -> Rc<RefCell<Option<HeaderMap>>> {
|
pub fn trailers(&self) -> Rc<RefCell<Option<HeaderMap>>> {
|
||||||
self.2.clone()
|
self.headers.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn complete(&mut self, success: bool) -> ResponseBytesInner {
|
fn complete(&mut self, success: bool) -> ResponseBytesInner {
|
||||||
if matches!(self.0, ResponseBytesInner::Done) {
|
if matches!(self.inner, ResponseBytesInner::Done) {
|
||||||
return ResponseBytesInner::Done;
|
return ResponseBytesInner::Done;
|
||||||
}
|
}
|
||||||
|
|
||||||
let current = std::mem::replace(&mut self.0, ResponseBytesInner::Done);
|
let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done);
|
||||||
self.1.complete(success);
|
self.completion_handle.complete(success);
|
||||||
current
|
current
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -274,9 +282,9 @@ impl Body for ResponseBytes {
|
||||||
cx: &mut std::task::Context<'_>,
|
cx: &mut std::task::Context<'_>,
|
||||||
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
||||||
let res = loop {
|
let res = loop {
|
||||||
let res = match &mut self.0 {
|
let res = match &mut self.inner {
|
||||||
ResponseBytesInner::Done | ResponseBytesInner::Empty => {
|
ResponseBytesInner::Done | ResponseBytesInner::Empty => {
|
||||||
if let Some(trailers) = self.2.borrow_mut().take() {
|
if let Some(trailers) = self.headers.borrow_mut().take() {
|
||||||
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
|
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
|
||||||
}
|
}
|
||||||
unreachable!()
|
unreachable!()
|
||||||
|
@ -303,7 +311,7 @@ impl Body for ResponseBytes {
|
||||||
};
|
};
|
||||||
|
|
||||||
if matches!(res, ResponseStreamResult::EndOfStream) {
|
if matches!(res, ResponseStreamResult::EndOfStream) {
|
||||||
if let Some(trailers) = self.2.borrow_mut().take() {
|
if let Some(trailers) = self.headers.borrow_mut().take() {
|
||||||
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
|
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
|
||||||
}
|
}
|
||||||
self.complete(true);
|
self.complete(true);
|
||||||
|
@ -312,21 +320,23 @@ impl Body for ResponseBytes {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_end_stream(&self) -> bool {
|
fn is_end_stream(&self) -> bool {
|
||||||
matches!(self.0, ResponseBytesInner::Done | ResponseBytesInner::Empty)
|
matches!(
|
||||||
&& self.2.borrow_mut().is_none()
|
self.inner,
|
||||||
|
ResponseBytesInner::Done | ResponseBytesInner::Empty
|
||||||
|
) && self.headers.borrow_mut().is_none()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn size_hint(&self) -> SizeHint {
|
fn size_hint(&self) -> SizeHint {
|
||||||
// The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
|
// The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
|
||||||
// anyways just in case hyper needs it.
|
// anyways just in case hyper needs it.
|
||||||
self.0.size_hint()
|
self.inner.size_hint()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for ResponseBytes {
|
impl Drop for ResponseBytes {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// We won't actually poll_frame for Empty responses so this is where we return success
|
// We won't actually poll_frame for Empty responses so this is where we return success
|
||||||
self.complete(matches!(self.0, ResponseBytesInner::Empty));
|
self.complete(matches!(self.inner, ResponseBytesInner::Empty));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,8 @@ use crate::request_properties::HttpConnectionProperties;
|
||||||
use crate::response_body::CompletionHandle;
|
use crate::response_body::CompletionHandle;
|
||||||
use crate::response_body::ResponseBytes;
|
use crate::response_body::ResponseBytes;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
|
use deno_core::OpState;
|
||||||
|
use deno_core::ResourceId;
|
||||||
use http::request::Parts;
|
use http::request::Parts;
|
||||||
use http::HeaderMap;
|
use http::HeaderMap;
|
||||||
use hyper1::body::Incoming;
|
use hyper1::body::Incoming;
|
||||||
|
@ -18,10 +20,36 @@ pub type Request = hyper1::Request<Incoming>;
|
||||||
pub type Response = hyper1::Response<ResponseBytes>;
|
pub type Response = hyper1::Response<ResponseBytes>;
|
||||||
pub type SlabId = u32;
|
pub type SlabId = u32;
|
||||||
|
|
||||||
|
enum RequestBodyState {
|
||||||
|
Incoming(Incoming),
|
||||||
|
Resource(HttpRequestBodyAutocloser),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Incoming> for RequestBodyState {
|
||||||
|
fn from(value: Incoming) -> Self {
|
||||||
|
RequestBodyState::Incoming(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ensures that the request body closes itself when no longer needed.
|
||||||
|
pub struct HttpRequestBodyAutocloser(ResourceId, Rc<RefCell<OpState>>);
|
||||||
|
|
||||||
|
impl HttpRequestBodyAutocloser {
|
||||||
|
pub fn new(res: ResourceId, op_state: Rc<RefCell<OpState>>) -> Self {
|
||||||
|
Self(res, op_state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for HttpRequestBodyAutocloser {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
_ = self.1.borrow_mut().resource_table.close(self.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct HttpSlabRecord {
|
pub struct HttpSlabRecord {
|
||||||
request_info: HttpConnectionProperties,
|
request_info: HttpConnectionProperties,
|
||||||
request_parts: Parts,
|
request_parts: Parts,
|
||||||
request_body: Option<Incoming>,
|
request_body: Option<RequestBodyState>,
|
||||||
// The response may get taken before we tear this down
|
// The response may get taken before we tear this down
|
||||||
response: Option<Response>,
|
response: Option<Response>,
|
||||||
promise: CompletionHandle,
|
promise: CompletionHandle,
|
||||||
|
@ -98,6 +126,7 @@ fn slab_insert_raw(
|
||||||
let mut slab = slab.borrow_mut();
|
let mut slab = slab.borrow_mut();
|
||||||
let body = ResponseBytes::default();
|
let body = ResponseBytes::default();
|
||||||
let trailers = body.trailers();
|
let trailers = body.trailers();
|
||||||
|
let request_body = request_body.map(|r| r.into());
|
||||||
slab.insert(HttpSlabRecord {
|
slab.insert(HttpSlabRecord {
|
||||||
request_info,
|
request_info,
|
||||||
request_parts,
|
request_parts,
|
||||||
|
@ -174,8 +203,35 @@ impl SlabEntry {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Take the Hyper body from this entry.
|
/// Take the Hyper body from this entry.
|
||||||
pub fn take_body(&mut self) -> Incoming {
|
pub fn take_body(&mut self) -> Option<Incoming> {
|
||||||
self.self_mut().request_body.take().unwrap()
|
let body_holder = &mut self.self_mut().request_body;
|
||||||
|
let body = body_holder.take();
|
||||||
|
match body {
|
||||||
|
Some(RequestBodyState::Incoming(body)) => Some(body),
|
||||||
|
x => {
|
||||||
|
*body_holder = x;
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn take_resource(&mut self) -> Option<HttpRequestBodyAutocloser> {
|
||||||
|
let body_holder = &mut self.self_mut().request_body;
|
||||||
|
let body = body_holder.take();
|
||||||
|
match body {
|
||||||
|
Some(RequestBodyState::Resource(res)) => Some(res),
|
||||||
|
x => {
|
||||||
|
*body_holder = x;
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Replace the request body with a resource ID and the OpState we'll need to shut it down.
|
||||||
|
/// We cannot keep just the resource itself, as JS code might be reading from the resource ID
|
||||||
|
/// to generate the response data (requiring us to keep it in the resource table).
|
||||||
|
pub fn put_resource(&mut self, res: HttpRequestBodyAutocloser) {
|
||||||
|
self.self_mut().request_body = Some(RequestBodyState::Resource(res));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Complete this entry, potentially expunging it if it is complete.
|
/// Complete this entry, potentially expunging it if it is complete.
|
||||||
|
|
Loading…
Reference in a new issue