1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-24 16:19:12 -05:00

refactor: use resourceForReadableStream for fetch (#20217)

Switch `ext/fetch` over to `resourceForReadableStream` to simplify and
unify implementation with `ext/serve`. This allows us to work in Rust
with resources only.

Two additional changes made to `resourceForReadableStream` were
required:

- Add an optional length to `resourceForReadableStream` which translates
to `size_hint`
 - Fix a bug where writing to a closed stream that was full would panic
This commit is contained in:
Matt Mastracci 2023-12-01 08:56:10 -07:00 committed by GitHub
parent 687ae870d1
commit e6e708e46c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 311 additions and 358 deletions

1
Cargo.lock generated
View file

@ -1187,6 +1187,7 @@ dependencies = [
"deno_tls",
"dyn-clone",
"http",
"pin-project",
"reqwest",
"serde",
"tokio",

View file

@ -3,6 +3,7 @@ import {
assert,
assertEquals,
assertRejects,
assertThrows,
delay,
fail,
unimplemented,
@ -523,7 +524,7 @@ Deno.test(
);
Deno.test({ permissions: { net: true } }, async function fetchInitBlobBody() {
const data = "const a = 1";
const data = "const a = 1 🦕";
const blob = new Blob([data], {
type: "text/javascript",
});
@ -555,7 +556,11 @@ Deno.test(
async function fetchInitFormDataBlobFilenameBody() {
const form = new FormData();
form.append("field", "value");
form.append("file", new Blob([new TextEncoder().encode("deno")]));
form.append(
"file",
new Blob([new TextEncoder().encode("deno")]),
"file name",
);
const response = await fetch("http://localhost:4545/echo_server", {
method: "POST",
body: form,
@ -564,7 +569,28 @@ Deno.test(
assertEquals(form.get("field"), resultForm.get("field"));
const file = resultForm.get("file");
assert(file instanceof File);
assertEquals(file.name, "blob");
assertEquals(file.name, "file name");
},
);
Deno.test(
{ permissions: { net: true } },
async function fetchInitFormDataFileFilenameBody() {
const form = new FormData();
form.append("field", "value");
form.append(
"file",
new File([new Blob([new TextEncoder().encode("deno")])], "file name"),
);
const response = await fetch("http://localhost:4545/echo_server", {
method: "POST",
body: form,
});
const resultForm = await response.formData();
assertEquals(form.get("field"), resultForm.get("field"));
const file = resultForm.get("file");
assert(file instanceof File);
assertEquals(file.name, "file name");
},
);
@ -1193,10 +1219,8 @@ Deno.test(
"accept-encoding: gzip, br\r\n",
`host: ${addr}\r\n`,
`transfer-encoding: chunked\r\n\r\n`,
"6\r\n",
"hello \r\n",
"5\r\n",
"world\r\n",
"B\r\n",
"hello world\r\n",
"0\r\n\r\n",
].join("");
assertEquals(actual, expected);
@ -1259,13 +1283,19 @@ Deno.test(
Deno.test(
{ permissions: { net: true } },
async function fetchNoServerReadableStreamBody() {
const { promise, resolve } = Promise.withResolvers<void>();
const completed = Promise.withResolvers<void>();
const failed = Promise.withResolvers<void>();
const body = new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1]));
setTimeout(() => {
controller.enqueue(new Uint8Array([2]));
resolve();
setTimeout(async () => {
// This is technically a race. If the fetch has failed by this point, the enqueue will
// throw. If not, it will succeed. Windows appears to take a while to time out the fetch,
// so we will just wait for that here before we attempt to enqueue so it's consistent
// across platforms.
await failed.promise;
assertThrows(() => controller.enqueue(new Uint8Array([2])));
completed.resolve();
}, 1000);
},
});
@ -1273,7 +1303,8 @@ Deno.test(
await assertRejects(async () => {
await fetch(nonExistentHostname, { body, method: "POST" });
}, TypeError);
await promise;
failed.resolve();
await completed.promise;
},
);
@ -1853,8 +1884,9 @@ Deno.test(
async function fetchBlobUrl(): Promise<void> {
const blob = new Blob(["ok"], { type: "text/plain" });
const url = URL.createObjectURL(blob);
assert(url.startsWith("blob:"), `URL was ${url}`);
const res = await fetch(url);
assert(res.url.startsWith("blob:http://js-unit-tests/"));
assertEquals(res.url, url);
assertEquals(res.status, 200);
assertEquals(res.headers.get("content-length"), "2");
assertEquals(res.headers.get("content-type"), "text/plain");
@ -1941,9 +1973,12 @@ Deno.test(
})
);
assert(err instanceof TypeError);
assert(err.cause);
assert(err.cause instanceof Error);
assert(err instanceof TypeError, `err was not a TypeError ${err}`);
assert(err.cause, `err.cause was null ${err}`);
assert(
err.cause instanceof Error,
`err.cause was not an Error ${err.cause}`,
);
assertEquals(err.cause.message, "foo");
await server;
@ -1968,7 +2003,12 @@ Deno.test(
method: "POST",
signal: controller.signal,
});
controller.abort();
try {
controller.abort();
} catch (e) {
console.log(e);
fail("abort should not throw");
}
await promise;
},
DOMException,

View file

@ -190,44 +190,46 @@ Deno.test(async function readableStream() {
// Close the stream after reading everything
Deno.test(async function readableStreamClose() {
const { promise: cancelPromise, resolve: cancelResolve } = Promise
.withResolvers();
const rid = resourceForReadableStream(helloWorldStream(false, cancelResolve));
const cancel = Promise.withResolvers();
const rid = resourceForReadableStream(
helloWorldStream(false, cancel.resolve),
);
const buffer = new Uint8Array(1024);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 12);
core.ops.op_close(rid);
assertEquals(await cancelPromise, "resource closed");
assertEquals(await cancel.promise, "resource closed");
});
// Close the stream without reading everything
Deno.test(async function readableStreamClosePartialRead() {
const { promise: cancelPromise, resolve: cancelResolve } = Promise
.withResolvers();
const rid = resourceForReadableStream(helloWorldStream(false, cancelResolve));
const cancel = Promise.withResolvers();
const rid = resourceForReadableStream(
helloWorldStream(false, cancel.resolve),
);
const buffer = new Uint8Array(5);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 5);
core.ops.op_close(rid);
assertEquals(await cancelPromise, "resource closed");
assertEquals(await cancel.promise, "resource closed");
});
// Close the stream without reading anything
Deno.test(async function readableStreamCloseWithoutRead() {
const { promise: cancelPromise, resolve: cancelResolve } = Promise
.withResolvers();
const rid = resourceForReadableStream(helloWorldStream(false, cancelResolve));
const cancel = Promise.withResolvers();
const rid = resourceForReadableStream(
helloWorldStream(false, cancel.resolve),
);
core.ops.op_close(rid);
assertEquals(await cancelPromise, "resource closed");
assertEquals(await cancel.promise, "resource closed");
});
// Close the stream without reading anything
Deno.test(async function readableStreamCloseWithoutRead2() {
const { promise: cancelPromise, resolve: cancelResolve } = Promise
.withResolvers();
const rid = resourceForReadableStream(longAsyncStream(cancelResolve));
const cancel = Promise.withResolvers();
const rid = resourceForReadableStream(longAsyncStream(cancel.resolve));
core.ops.op_close(rid);
assertEquals(await cancelPromise, "resource closed");
assertEquals(await cancel.promise, "resource closed");
});
Deno.test(async function readableStreamPartial() {
@ -439,32 +441,38 @@ function createStreamTest(
});
}
Deno.test(async function readableStreamWithAggressiveResourceClose() {
let first = true;
const { promise: reasonPromise, resolve: reasonResolve } = Promise
.withResolvers();
const rid = resourceForReadableStream(
new ReadableStream({
pull(controller) {
if (first) {
// We queue this up and then immediately close the resource (not the reader)
controller.enqueue(new Uint8Array(1));
core.close(rid);
// This doesn't throw, even though the resource is closed
controller.enqueue(new Uint8Array(1));
first = false;
}
},
cancel(reason) {
reasonResolve(reason);
},
}),
);
try {
await core.ops.op_read(rid, new Uint8Array(1));
fail();
} catch (e) {
assertEquals(e.message, "operation canceled");
}
assertEquals(await reasonPromise, "resource closed");
});
// 1024 is the size of the internal packet buffer -- we want to make sure we fill the internal pipe fully.
for (const packetCount of [1, 1024]) {
Deno.test(`readableStreamWithAggressiveResourceClose_${packetCount}`, async function () {
let first = true;
const { promise, resolve } = Promise.withResolvers();
const rid = resourceForReadableStream(
new ReadableStream({
pull(controller) {
if (first) {
// We queue this up and then immediately close the resource (not the reader)
for (let i = 0; i < packetCount; i++) {
controller.enqueue(new Uint8Array(1));
}
core.close(rid);
// This doesn't throw, even though the resource is closed
controller.enqueue(new Uint8Array(1));
first = false;
}
},
cancel(reason) {
resolve(reason);
},
}),
);
try {
for (let i = 0; i < packetCount; i++) {
await core.ops.op_read(rid, new Uint8Array(1));
}
fail();
} catch (e) {
assertEquals(e.message, "operation canceled");
}
assertEquals(await promise, "resource closed");
});
}

View file

@ -14,11 +14,12 @@ const core = globalThis.Deno.core;
const ops = core.ops;
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { byteLowerCase } from "ext:deno_web/00_infra.js";
import { BlobPrototype } from "ext:deno_web/09_file.js";
import {
errorReadableStream,
getReadableStreamResourceBacking,
readableStreamForRid,
ReadableStreamPrototype,
resourceForReadableStream,
} from "ext:deno_web/06_streams.js";
import { extractBody, InnerBody } from "ext:deno_fetch/22_body.js";
import { processUrlList, toInnerRequest } from "ext:deno_fetch/23_request.js";
@ -37,22 +38,17 @@ const {
ArrayPrototypeSplice,
ArrayPrototypeFilter,
ArrayPrototypeIncludes,
Error,
ObjectPrototypeIsPrototypeOf,
Promise,
PromisePrototypeThen,
PromisePrototypeCatch,
SafeArrayIterator,
SafeWeakMap,
String,
StringPrototypeStartsWith,
StringPrototypeToLowerCase,
TypeError,
Uint8Array,
Uint8ArrayPrototype,
WeakMapPrototypeDelete,
WeakMapPrototypeGet,
WeakMapPrototypeHas,
WeakMapPrototypeSet,
} = primordials;
const REQUEST_BODY_HEADER_NAMES = [
@ -62,28 +58,9 @@ const REQUEST_BODY_HEADER_NAMES = [
"content-type",
];
const requestBodyReaders = new SafeWeakMap();
/**
* @param {{ method: string, url: string, headers: [string, string][], clientRid: number | null, hasBody: boolean }} args
* @param {Uint8Array | null} body
* @returns {{ requestRid: number, requestBodyRid: number | null, cancelHandleRid: number | null }}
*/
function opFetch(method, url, headers, clientRid, hasBody, bodyLength, body) {
return ops.op_fetch(
method,
url,
headers,
clientRid,
hasBody,
bodyLength,
body,
);
}
/**
* @param {number} rid
* @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>}
* @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number, error: string? }>}
*/
function opFetchSend(rid) {
return core.opAsync("op_fetch_send", rid);
@ -145,154 +122,59 @@ async function mainFetch(req, recursive, terminator) {
/** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
let reqBody = null;
let reqRid = null;
if (req.body !== null) {
if (
ObjectPrototypeIsPrototypeOf(
ReadableStreamPrototype,
req.body.streamOrStatic,
)
) {
if (
req.body.length === null ||
ObjectPrototypeIsPrototypeOf(BlobPrototype, req.body.source)
) {
reqBody = req.body.stream;
if (req.body) {
const stream = req.body.streamOrStatic;
const body = stream.body;
if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) {
reqBody = body;
} else if (typeof body === "string") {
reqBody = core.encode(body);
} else if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) {
const resourceBacking = getReadableStreamResourceBacking(stream);
if (resourceBacking) {
reqRid = resourceBacking.rid;
} else {
const reader = req.body.stream.getReader();
WeakMapPrototypeSet(requestBodyReaders, req, reader);
const r1 = await reader.read();
if (r1.done) {
reqBody = new Uint8Array(0);
} else {
reqBody = r1.value;
const r2 = await reader.read();
if (!r2.done) throw new TypeError("Unreachable");
}
WeakMapPrototypeDelete(requestBodyReaders, req);
reqRid = resourceForReadableStream(stream, req.body.length);
}
} else {
req.body.streamOrStatic.consumed = true;
reqBody = req.body.streamOrStatic.body;
// TODO(@AaronO): plumb support for StringOrBuffer all the way
reqBody = typeof reqBody === "string" ? core.encode(reqBody) : reqBody;
throw TypeError("invalid body");
}
}
const { requestRid, requestBodyRid, cancelHandleRid } = opFetch(
const { requestRid, cancelHandleRid } = ops.op_fetch(
req.method,
req.currentUrl(),
req.headerList,
req.clientRid,
reqBody !== null,
req.body?.length,
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, reqBody) ? reqBody : null,
reqBody !== null || reqRid !== null,
reqBody,
reqRid,
);
function onAbort() {
if (cancelHandleRid !== null) {
core.tryClose(cancelHandleRid);
}
if (requestBodyRid !== null) {
core.tryClose(requestBodyRid);
}
}
terminator[abortSignal.add](onAbort);
let requestSendError;
let requestSendErrorSet = false;
async function propagateError(err, message) {
// TODO(lucacasonato): propagate error into response body stream
try {
await core.writeTypeError(requestBodyRid, message);
} catch (err) {
if (!requestSendErrorSet) {
requestSendErrorSet = true;
requestSendError = err;
}
}
if (!requestSendErrorSet) {
requestSendErrorSet = true;
requestSendError = err;
}
}
if (requestBodyRid !== null) {
if (
reqBody === null ||
!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, reqBody)
) {
throw new TypeError("Unreachable");
}
const reader = reqBody.getReader();
WeakMapPrototypeSet(requestBodyReaders, req, reader);
(async () => {
let done = false;
while (!done) {
let val;
try {
const res = await reader.read();
done = res.done;
val = res.value;
} catch (err) {
if (terminator.aborted) break;
await propagateError(err, "failed to read");
break;
}
if (done) break;
if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, val)) {
const error = new TypeError(
"Item in request body ReadableStream is not a Uint8Array",
);
await reader.cancel(error);
await propagateError(error, error.message);
break;
}
try {
await core.writeAll(requestBodyRid, val);
} catch (err) {
if (terminator.aborted) break;
await reader.cancel(err);
await propagateError(err, "failed to write");
break;
}
}
if (done && !terminator.aborted) {
try {
await core.shutdown(requestBodyRid);
} catch (err) {
if (!terminator.aborted) {
await propagateError(err, "failed to flush");
}
}
}
WeakMapPrototypeDelete(requestBodyReaders, req);
reader.releaseLock();
core.tryClose(requestBodyRid);
})();
}
let resp;
try {
resp = await opFetchSend(requestRid);
} catch (err) {
if (terminator.aborted) return;
if (requestSendErrorSet) {
// if the request body stream errored, we want to propagate that error
// instead of the original error from opFetchSend
throw new TypeError("Failed to fetch: request body stream errored", {
cause: requestSendError,
});
}
if (requestBodyRid !== null) {
core.tryClose(requestBodyRid);
}
throw err;
} finally {
if (cancelHandleRid !== null) {
core.tryClose(cancelHandleRid);
}
}
// Re-throw any body errors
if (resp.error) {
throw new TypeError("body failed", { cause: new Error(resp.error) });
}
if (terminator.aborted) return abortedNetworkError();
processUrlList(req.urlList, req.urlListProcessed);
@ -510,9 +392,8 @@ function fetch(input, init = {}) {
function abortFetch(request, responseObject, error) {
if (request.body !== null) {
if (WeakMapPrototypeHas(requestBodyReaders, request)) {
WeakMapPrototypeGet(requestBodyReaders, request).cancel(error);
} else {
// Cancel the body if we haven't taken it as a resource yet
if (!request.body.streamOrStatic.locked) {
request.body.cancel(error);
}
}

View file

@ -20,6 +20,7 @@ deno_core.workspace = true
deno_tls.workspace = true
dyn-clone = "1"
http.workspace = true
pin-project.workspace = true
reqwest.workspace = true
serde.workspace = true
tokio.workspace = true

View file

@ -11,6 +11,8 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use deno_core::anyhow::Error;
use deno_core::error::type_error;
@ -21,13 +23,11 @@ use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::BufView;
use deno_core::WriteOutcome;
use deno_core::unsync::spawn;
use deno_core::url::Url;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@ -62,7 +62,6 @@ use serde::Deserialize;
use serde::Serialize;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
// Re-export reqwest and data_url
pub use data_url;
@ -184,7 +183,6 @@ pub fn get_declaration() -> PathBuf {
#[serde(rename_all = "camelCase")]
pub struct FetchReturn {
pub request_rid: ResourceId,
pub request_body_rid: Option<ResourceId>,
pub cancel_handle_rid: Option<ResourceId>,
}
@ -216,6 +214,59 @@ pub fn get_or_create_client_from_state(
}
}
#[allow(clippy::type_complexity)]
pub struct ResourceToBodyAdapter(
Rc<dyn Resource>,
Option<Pin<Box<dyn Future<Output = Result<BufView, Error>>>>>,
);
impl ResourceToBodyAdapter {
pub fn new(resource: Rc<dyn Resource>) -> Self {
let future = resource.clone().read(64 * 1024);
Self(resource, Some(future))
}
}
// SAFETY: we only use this on a single-threaded executor
unsafe impl Send for ResourceToBodyAdapter {}
// SAFETY: we only use this on a single-threaded executor
unsafe impl Sync for ResourceToBodyAdapter {}
impl Stream for ResourceToBodyAdapter {
type Item = Result<BufView, Error>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(mut fut) = this.1.take() {
match fut.poll_unpin(cx) {
Poll::Pending => {
this.1 = Some(fut);
Poll::Pending
}
Poll::Ready(res) => match res {
Ok(buf) if buf.is_empty() => Poll::Ready(None),
Ok(_) => {
this.1 = Some(this.0.clone().read(64 * 1024));
Poll::Ready(Some(res))
}
_ => Poll::Ready(Some(res)),
},
}
} else {
Poll::Ready(None)
}
}
}
impl Drop for ResourceToBodyAdapter {
fn drop(&mut self) {
self.0.clone().close()
}
}
#[op2]
#[serde]
#[allow(clippy::too_many_arguments)]
@ -226,8 +277,8 @@ pub fn op_fetch<FP>(
#[serde] headers: Vec<(ByteString, ByteString)>,
#[smi] client_rid: Option<u32>,
has_body: bool,
#[number] body_length: Option<u64>,
#[buffer] data: Option<JsBuffer>,
#[smi] resource: Option<ResourceId>,
) -> Result<FetchReturn, AnyError>
where
FP: FetchPermissions + 'static,
@ -244,7 +295,7 @@ where
// Check scheme before asking for net permission
let scheme = url.scheme();
let (request_rid, request_body_rid, cancel_handle_rid) = match scheme {
let (request_rid, cancel_handle_rid) = match scheme {
"file" => {
let path = url.to_file_path().map_err(|_| {
type_error("NetworkError when attempting to fetch resource.")
@ -268,7 +319,7 @@ where
let maybe_cancel_handle_rid = maybe_cancel_handle
.map(|ch| state.resource_table.add(FetchCancelHandle(ch)));
(request_rid, None, maybe_cancel_handle_rid)
(request_rid, maybe_cancel_handle_rid)
}
"http" | "https" => {
let permissions = state.borrow_mut::<FP>();
@ -282,34 +333,25 @@ where
let mut request = client.request(method.clone(), url);
let request_body_rid = if has_body {
match data {
None => {
// If no body is passed, we return a writer for streaming the body.
let (tx, stream) = tokio::sync::mpsc::channel(1);
// If the size of the body is known, we include a content-length
// header explicitly.
if let Some(body_size) = body_length {
request =
request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
}
request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
let request_body_rid =
state.resource_table.add(FetchRequestBodyResource {
body: AsyncRefCell::new(Some(tx)),
cancel: CancelHandle::default(),
});
Some(request_body_rid)
}
Some(data) => {
if has_body {
match (data, resource) {
(Some(data), _) => {
// If a body is passed, we use it, and don't return a body for streaming.
request = request.body(data.to_vec());
None
}
(_, Some(resource)) => {
let resource = state.resource_table.take_any(resource)?;
match resource.size_hint() {
(body_size, Some(n)) if body_size == n && body_size > 0 => {
request =
request.header(CONTENT_LENGTH, HeaderValue::from(body_size));
}
_ => {}
}
request = request
.body(Body::wrap_stream(ResourceToBodyAdapter::new(resource)))
}
(None, None) => unreachable!(),
}
} else {
// POST and PUT requests should always have a 0 length content-length,
@ -317,7 +359,6 @@ where
if matches!(method, Method::POST | Method::PUT) {
request = request.header(CONTENT_LENGTH, HeaderValue::from(0));
}
None
};
let mut header_map = HeaderMap::new();
@ -354,7 +395,7 @@ where
.send()
.or_cancel(cancel_handle_)
.await
.map(|res| res.map_err(|err| type_error(err.to_string())))
.map(|res| res.map_err(|err| err.into()))
};
let request_rid = state
@ -364,7 +405,7 @@ where
let cancel_handle_rid =
state.resource_table.add(FetchCancelHandle(cancel_handle));
(request_rid, request_body_rid, Some(cancel_handle_rid))
(request_rid, Some(cancel_handle_rid))
}
"data" => {
let data_url = DataUrl::process(url.as_str())
@ -385,7 +426,7 @@ where
.resource_table
.add(FetchRequestResource(Box::pin(fut)));
(request_rid, None, None)
(request_rid, None)
}
"blob" => {
// Blob URL resolution happens in the JS side of fetch. If we got here is
@ -397,12 +438,11 @@ where
Ok(FetchReturn {
request_rid,
request_body_rid,
cancel_handle_rid,
})
}
#[derive(Serialize)]
#[derive(Default, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FetchResponse {
pub status: u16,
@ -413,6 +453,7 @@ pub struct FetchResponse {
pub content_length: Option<u64>,
pub remote_addr_ip: Option<String>,
pub remote_addr_port: Option<u16>,
pub error: Option<String>,
}
#[op2(async)]
@ -432,7 +473,29 @@ pub async fn op_fetch_send(
let res = match request.0.await {
Ok(Ok(res)) => res,
Ok(Err(err)) => return Err(type_error(err.to_string())),
Ok(Err(err)) => {
// We're going to try and rescue the error cause from a stream and return it from this fetch.
// If any error in the chain is a reqwest body error, return that as a special result we can use to
// reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`).
// TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead
let mut err_ref: &dyn std::error::Error = err.as_ref();
while let Some(err) = std::error::Error::source(err_ref) {
if let Some(err) = err.downcast_ref::<reqwest::Error>() {
if err.is_body() {
// Extracts the next error cause and uses that for the message
if let Some(err) = std::error::Error::source(err) {
return Ok(FetchResponse {
error: Some(err.to_string()),
..Default::default()
});
}
}
}
err_ref = err;
}
return Err(type_error(err.to_string()));
}
Err(_) => return Err(type_error("request was cancelled")),
};
@ -465,6 +528,7 @@ pub async fn op_fetch_send(
content_length,
remote_addr_ip,
remote_addr_port,
error: None,
})
}
@ -599,74 +663,6 @@ impl Resource for FetchCancelHandle {
}
}
/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`].
pub struct FetchBodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>);
impl Stream for FetchBodyStream {
type Item = Result<bytes::Bytes, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_recv(cx)
}
}
pub struct FetchRequestBodyResource {
pub body: AsyncRefCell<Option<mpsc::Sender<Result<bytes::Bytes, Error>>>>,
pub cancel: CancelHandle,
}
impl Resource for FetchRequestBodyResource {
fn name(&self) -> Cow<str> {
"fetchRequestBody".into()
}
fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> {
Box::pin(async move {
let bytes: bytes::Bytes = buf.into();
let nwritten = bytes.len();
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
let body = (*body).as_ref();
let cancel = RcRef::map(self, |r| &r.cancel);
let body = body.ok_or(type_error(
"request body receiver not connected (request closed)",
))?;
body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| {
type_error("request body receiver not connected (request closed)")
})?;
Ok(WriteOutcome::Full { nwritten })
})
}
fn write_error(self: Rc<Self>, error: Error) -> AsyncResult<()> {
async move {
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
let body = (*body).as_ref();
let cancel = RcRef::map(self, |r| &r.cancel);
let body = body.ok_or(type_error(
"request body receiver not connected (request closed)",
))?;
body.send(Err(error)).or_cancel(cancel).await??;
Ok(())
}
.boxed_local()
}
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
async move {
let mut body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
body.take();
Ok(())
}
.boxed_local()
}
fn close(self: Rc<Self>) {
self.cancel.cancel();
}
}
type BytesStream =
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;

View file

@ -4,18 +4,17 @@ use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::url::Url;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
use deno_core::ResourceId;
use deno_fetch::get_or_create_client_from_state;
use deno_fetch::FetchBodyStream;
use deno_fetch::FetchCancelHandle;
use deno_fetch::FetchRequestBodyResource;
use deno_fetch::FetchRequestResource;
use deno_fetch::FetchReturn;
use deno_fetch::HttpClientResource;
use deno_fetch::ResourceToBodyAdapter;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
@ -31,7 +30,7 @@ pub fn op_node_http_request<P>(
#[string] url: String,
#[serde] headers: Vec<(ByteString, ByteString)>,
#[smi] client_rid: Option<u32>,
has_body: bool,
#[smi] body: Option<ResourceId>,
) -> Result<FetchReturn, AnyError>
where
P: crate::NodePermissions + 'static,
@ -63,25 +62,16 @@ where
let mut request = client.request(method.clone(), url).headers(header_map);
let request_body_rid = if has_body {
// If no body is passed, we return a writer for streaming the body.
let (tx, stream) = tokio::sync::mpsc::channel(1);
request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
let request_body_rid = state.resource_table.add(FetchRequestBodyResource {
body: AsyncRefCell::new(Some(tx)),
cancel: CancelHandle::default(),
});
Some(request_body_rid)
if let Some(body) = body {
request = request.body(Body::wrap_stream(ResourceToBodyAdapter::new(
state.resource_table.take_any(body)?,
)));
} else {
// POST and PUT requests should always have a 0 length content-length,
// if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
if matches!(method, Method::POST | Method::PUT) {
request = request.header(CONTENT_LENGTH, HeaderValue::from(0));
}
None
};
let cancel_handle = CancelHandle::new_rc();
@ -104,7 +94,6 @@ where
Ok(FetchReturn {
request_rid,
request_body_rid,
cancel_handle_rid: Some(cancel_handle_rid),
})
}

View file

@ -4,7 +4,6 @@
// TODO(petamoriken): enable prefer-primordials for node polyfills
// deno-lint-ignore-file prefer-primordials
const core = globalThis.__bootstrap.core;
import { getDefaultHighWaterMark } from "ext:deno_node/internal/streams/state.mjs";
import assert from "ext:deno_node/internal/assert.mjs";
import EE from "node:events";
@ -544,7 +543,7 @@ export class OutgoingMessage extends Stream {
data = new Uint8Array(data.buffer);
}
if (data.buffer.byteLength > 0) {
core.writeAll(this._bodyWriteRid, data).then(() => {
this._bodyWriter.write(data).then(() => {
callback?.();
this.emit("drain");
}).catch((e) => {

View file

@ -58,6 +58,7 @@ import { createHttpClient } from "ext:deno_fetch/22_http_client.js";
import { headersEntries } from "ext:deno_fetch/20_headers.js";
import { timerId } from "ext:deno_web/03_abort_signal.js";
import { clearTimeout as webClearTimeout } from "ext:deno_web/02_timers.js";
import { resourceForReadableStream } from "ext:deno_web/06_streams.js";
import { TcpConn } from "ext:deno_net/01_net.js";
enum STATUS_CODES {
@ -586,15 +587,28 @@ class ClientRequest extends OutgoingMessage {
const client = this._getClient() ?? createHttpClient({ http2: false });
this._client = client;
if (
this.method === "POST" || this.method === "PATCH" || this.method === "PUT"
) {
const { readable, writable } = new TransformStream({
cancel: (e) => {
this._requestSendError = e;
},
});
this._bodyWritable = writable;
this._bodyWriter = writable.getWriter();
this._bodyWriteRid = resourceForReadableStream(readable);
}
this._req = core.ops.op_node_http_request(
this.method,
url,
headers,
client.rid,
(this.method === "POST" || this.method === "PATCH" ||
this.method === "PUT") && this._contentLength !== 0,
this._bodyWriteRid,
);
this._bodyWriteRid = this._req.requestBodyRid;
}
_implicitHeader() {
@ -638,23 +652,11 @@ class ClientRequest extends OutgoingMessage {
this._implicitHeader();
this._send("", "latin1");
}
this._bodyWriter?.close();
(async () => {
try {
const [res, _] = await Promise.all([
core.opAsync("op_fetch_send", this._req.requestRid),
(async () => {
if (this._bodyWriteRid) {
try {
await core.shutdown(this._bodyWriteRid);
} catch (err) {
this._requestSendError = err;
}
core.tryClose(this._bodyWriteRid);
}
})(),
]);
const res = await core.opAsync("op_fetch_send", this._req.requestRid);
try {
cb?.();
} catch (_) {

View file

@ -12,6 +12,7 @@ const {
op_arraybuffer_was_detached,
op_transfer_arraybuffer,
op_readable_stream_resource_allocate,
op_readable_stream_resource_allocate_sized,
op_readable_stream_resource_get_sink,
op_readable_stream_resource_write_error,
op_readable_stream_resource_write_buf,
@ -863,13 +864,16 @@ function readableStreamReadFn(reader, sink) {
* read operations, and those read operations will be fed by the output of the
* ReadableStream source.
* @param {ReadableStream<Uint8Array>} stream
* @param {number | undefined} length
* @returns {number}
*/
function resourceForReadableStream(stream) {
function resourceForReadableStream(stream, length) {
const reader = acquireReadableStreamDefaultReader(stream);
// Allocate the resource
const rid = op_readable_stream_resource_allocate();
const rid = typeof length == "number"
? op_readable_stream_resource_allocate_sized(length)
: op_readable_stream_resource_allocate();
// Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors
PromisePrototypeCatch(

View file

@ -91,6 +91,7 @@ deno_core::extension!(deno_web,
op_sleep,
op_transfer_arraybuffer,
stream_resource::op_readable_stream_resource_allocate,
stream_resource::op_readable_stream_resource_allocate_sized,
stream_resource::op_readable_stream_resource_get_sink,
stream_resource::op_readable_stream_resource_write_error,
stream_resource::op_readable_stream_resource_write_buf,

View file

@ -197,7 +197,14 @@ impl BoundedBufferChannelInner {
pub fn write(&mut self, buffer: V8Slice<u8>) -> Result<(), V8Slice<u8>> {
let next_producer_index = (self.ring_producer + 1) % BUFFER_CHANNEL_SIZE;
if next_producer_index == self.ring_consumer {
return Err(buffer);
// Note that we may have been allowed to write because of a close/error condition, but the
// underlying channel is actually closed. If this is the case, we return `Ok(())`` and just
// drop the bytes on the floor.
return if self.closed || self.error.is_some() {
Ok(())
} else {
Err(buffer)
};
}
self.current_size += buffer.len();
@ -336,6 +343,7 @@ struct ReadableStreamResource {
channel: BoundedBufferChannel,
cancel_handle: CancelHandle,
data: ReadableStreamResourceData,
size_hint: (u64, Option<u64>),
}
impl ReadableStreamResource {
@ -378,6 +386,10 @@ impl Resource for ReadableStreamResource {
fn close(self: Rc<Self>) {
self.close_channel();
}
fn size_hint(&self) -> (u64, Option<u64>) {
self.size_hint
}
}
impl Drop for ReadableStreamResource {
@ -438,6 +450,25 @@ pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId {
cancel_handle: Default::default(),
channel: BoundedBufferChannel::default(),
data: ReadableStreamResourceData { completion },
size_hint: (0, None),
};
state.resource_table.add(resource)
}
/// Allocate a resource that wraps a ReadableStream, with a size hint.
#[op2(fast)]
#[smi]
pub fn op_readable_stream_resource_allocate_sized(
state: &mut OpState,
#[number] length: u64,
) -> ResourceId {
let completion = CompletionHandle::default();
let resource = ReadableStreamResource {
read_queue: Default::default(),
cancel_handle: Default::default(),
channel: BoundedBufferChannel::default(),
data: ReadableStreamResourceData { completion },
size_hint: (length, Some(length)),
};
state.resource_table.add(resource)
}