1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-25 15:29:32 -05:00

feat(ext/web): resourceForReadableStream (#20180)

Extracted from fast streams work.

This is a resource wrapper for `ReadableStream`, allowing us to treat
all `ReadableStream` instances as resources, and remove special paths in
both `fetch` and `serve`.

Performance with a ReadableStream response yields ~18% improvement:

```
  return new Response(new ReadableStream({
    start(controller) {
      controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]));
      controller.close();
    }
  })
```

This patch:

```
12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080
Running 10s test @ http://localhost:8080
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    99.96us  100.03us   6.65ms   98.84%
    Req/Sec    47.73k     2.43k   51.02k    89.11%
  959308 requests in 10.10s, 117.10MB read
Requests/sec:  94978.71
Transfer/sec:     11.59MB
```

main:

```
Running 10s test @ http://localhost:8080
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   163.03us  685.51us  19.73ms   99.27%
    Req/Sec    39.50k     3.98k   66.11k    95.52%
  789582 requests in 10.10s, 82.83MB read
Requests/sec:  78182.65
Transfer/sec:      8.20MB
```
This commit is contained in:
Matt Mastracci 2023-08-17 07:52:37 -06:00 committed by Divy Srivastava
parent 15f57a8535
commit 32947e5ea5
13 changed files with 743 additions and 250 deletions

2
Cargo.lock generated
View file

@ -1505,6 +1505,7 @@ version = "0.146.0"
dependencies = [
"async-trait",
"base64-simd",
"bytes",
"deno_bench_util",
"deno_console",
"deno_core",
@ -1512,6 +1513,7 @@ dependencies = [
"deno_webidl",
"encoding_rs",
"flate2",
"futures",
"serde",
"tokio",
"uuid",

View file

@ -78,6 +78,7 @@ util::unit_test_factory!(
signal_test,
stat_test,
stdio_test,
streams_test,
structured_clone_test,
symlink_test,
sync_test,

View file

@ -693,24 +693,30 @@ function createStreamTest(count: number, delay: number, action: string) {
onError: createOnErrorCb(ac),
});
try {
await listeningPromise;
const resp = await fetch(`http://127.0.0.1:${servePort}/`);
if (action == "Throw") {
try {
await resp.text();
fail();
} catch (_) {
// expected
}
} else {
const text = await resp.text();
ac.abort();
await server.finished;
let expected = "";
if (action == "Throw" && count < 2 && delay < 1000) {
// NOTE: This is specific to the current implementation. In some cases where a stream errors, we
// don't send the first packet.
expected = "";
} else {
for (let i = 0; i < count; i++) {
expected += `a${i}`;
}
}
assertEquals(text, expected);
}
} finally {
ac.abort();
await server.finished;
}
});
}

View file

@ -0,0 +1,299 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { fail } from "https://deno.land/std@v0.42.0/testing/asserts.ts";
import { assertEquals, Deferred, deferred } from "./test_util.ts";
const {
core,
resourceForReadableStream,
// @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol
} = Deno[Deno.internal];
const LOREM =
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.";
// Hello world, with optional close
// deno-lint-ignore no-explicit-any
function helloWorldStream(close?: boolean, completion?: Deferred<any>) {
return new ReadableStream({
start(controller) {
controller.enqueue("hello, world");
if (close == true) {
controller.close();
}
},
cancel(reason) {
completion?.resolve(reason);
},
}).pipeThrough(new TextEncoderStream());
}
// Hello world, with optional close
function errorStream(type: "string" | "controller" | "TypeError") {
return new ReadableStream({
start(controller) {
controller.enqueue("hello, world");
},
pull(controller) {
if (type == "string") {
throw "Uh oh (string)!";
}
if (type == "TypeError") {
throw TypeError("Uh oh (TypeError)!");
}
controller.error("Uh oh (controller)!");
},
}).pipeThrough(new TextEncoderStream());
}
// Long stream with Lorem Ipsum text.
function longStream() {
return new ReadableStream({
start(controller) {
for (let i = 0; i < 4; i++) {
setTimeout(() => {
controller.enqueue(LOREM);
if (i == 3) {
controller.close();
}
}, i * 100);
}
},
}).pipeThrough(new TextEncoderStream());
}
// Empty stream, closes either immediately or on a call to pull.
function emptyStream(onPull: boolean) {
return new ReadableStream({
start(controller) {
if (!onPull) {
controller.close();
}
},
pull(controller) {
if (onPull) {
controller.close();
}
},
}).pipeThrough(new TextEncoderStream());
}
// Include an empty chunk
function emptyChunkStream() {
return new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1]));
controller.enqueue(new Uint8Array([]));
controller.enqueue(new Uint8Array([2]));
controller.close();
},
});
}
// Creates a stream with the given number of packets, a configurable delay between packets, and a final
// action (either "Throw" or "Close").
function makeStreamWithCount(
count: number,
delay: number,
action: "Throw" | "Close",
): ReadableStream {
function doAction(controller: ReadableStreamDefaultController, i: number) {
if (i == count) {
if (action == "Throw") {
controller.error(new Error("Expected error!"));
} else {
controller.close();
}
} else {
controller.enqueue(String.fromCharCode("a".charCodeAt(0) + i));
if (delay == 0) {
doAction(controller, i + 1);
} else {
setTimeout(() => doAction(controller, i + 1), delay);
}
}
}
return new ReadableStream({
start(controller) {
if (delay == 0) {
doAction(controller, 0);
} else {
setTimeout(() => doAction(controller, 0), delay);
}
},
}).pipeThrough(new TextEncoderStream());
}
// Normal stream operation
Deno.test(async function readableStream() {
const rid = resourceForReadableStream(helloWorldStream());
const buffer = new Uint8Array(1024);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 12);
core.ops.op_close(rid);
});
// Close the stream after reading everything
Deno.test(async function readableStreamClose() {
const cancel = deferred();
const rid = resourceForReadableStream(helloWorldStream(false, cancel));
const buffer = new Uint8Array(1024);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 12);
core.ops.op_close(rid);
assertEquals(await cancel, undefined);
});
// Close the stream without reading everything
Deno.test(async function readableStreamClosePartialRead() {
const cancel = deferred();
const rid = resourceForReadableStream(helloWorldStream(false, cancel));
const buffer = new Uint8Array(5);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 5);
core.ops.op_close(rid);
assertEquals(await cancel, undefined);
});
// Close the stream without reading anything
Deno.test(async function readableStreamCloseWithoutRead() {
const cancel = deferred();
const rid = resourceForReadableStream(helloWorldStream(false, cancel));
core.ops.op_close(rid);
assertEquals(await cancel, undefined);
});
Deno.test(async function readableStreamPartial() {
const rid = resourceForReadableStream(helloWorldStream());
const buffer = new Uint8Array(5);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 5);
const buffer2 = new Uint8Array(1024);
const nread2 = await core.ops.op_read(rid, buffer2);
assertEquals(nread2, 7);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamLongReadAll() {
const rid = resourceForReadableStream(longStream());
const buffer = await core.ops.op_read_all(rid);
assertEquals(buffer.length, LOREM.length * 4);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamLongByPiece() {
const rid = resourceForReadableStream(longStream());
let total = 0;
for (let i = 0; i < 100; i++) {
const length = await core.ops.op_read(rid, new Uint8Array(16));
total += length;
if (length == 0) {
break;
}
}
assertEquals(total, LOREM.length * 4);
core.ops.op_close(rid);
});
for (
const type of [
"string",
"TypeError",
"controller",
] as ("string" | "TypeError" | "controller")[]
) {
Deno.test(`readableStreamError_${type}`, async function () {
const rid = resourceForReadableStream(errorStream(type));
assertEquals(12, await core.ops.op_read(rid, new Uint8Array(16)));
try {
await core.ops.op_read(rid, new Uint8Array(1));
fail();
} catch (e) {
assertEquals(e.message, `Uh oh (${type})!`);
}
core.ops.op_close(rid);
});
}
Deno.test(async function readableStreamEmptyOnStart() {
const rid = resourceForReadableStream(emptyStream(true));
const buffer = new Uint8Array(1024);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 0);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamEmptyOnPull() {
const rid = resourceForReadableStream(emptyStream(false));
const buffer = new Uint8Array(1024);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 0);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamEmptyReadAll() {
const rid = resourceForReadableStream(emptyStream(false));
const buffer = await core.ops.op_read_all(rid);
assertEquals(buffer.length, 0);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamWithEmptyChunk() {
const rid = resourceForReadableStream(emptyChunkStream());
const buffer = await core.ops.op_read_all(rid);
assertEquals(buffer, new Uint8Array([1, 2]));
core.ops.op_close(rid);
});
Deno.test(async function readableStreamWithEmptyChunkOneByOne() {
const rid = resourceForReadableStream(emptyChunkStream());
assertEquals(1, await core.ops.op_read(rid, new Uint8Array(1)));
assertEquals(1, await core.ops.op_read(rid, new Uint8Array(1)));
assertEquals(0, await core.ops.op_read(rid, new Uint8Array(1)));
core.ops.op_close(rid);
});
for (const count of [0, 1, 2, 3]) {
for (const delay of [0, 1, 10]) {
// Creating a stream that errors in start will throw
if (delay > 0) {
createStreamTest(count, delay, "Throw");
}
createStreamTest(count, delay, "Close");
}
}
function createStreamTest(
count: number,
delay: number,
action: "Throw" | "Close",
) {
Deno.test(`streamCount${count}Delay${delay}${action}`, async () => {
let rid;
try {
rid = resourceForReadableStream(
makeStreamWithCount(count, delay, action),
);
for (let i = 0; i < count; i++) {
const buffer = new Uint8Array(1);
await core.ops.op_read(rid, buffer);
}
if (action == "Throw") {
try {
const buffer = new Uint8Array(1);
assertEquals(1, await core.ops.op_read(rid, buffer));
fail();
} catch (e) {
// We expect this to be thrown
assertEquals(e.message, "Expected error!");
}
} else {
const buffer = new Uint8Array(1);
assertEquals(0, await core.ops.op_read(rid, buffer));
}
} finally {
core.ops.op_close(rid);
}
});
}

View file

@ -141,8 +141,7 @@ Deno.test("[node/http] chunked response", async () => {
}
});
// TODO(kt3k): This test case exercises the workaround for https://github.com/denoland/deno/issues/17194
// This should be removed when #17194 is resolved.
// Test empty chunks: https://github.com/denoland/deno/issues/17194
Deno.test("[node/http] empty chunk in the middle of response", async () => {
const promise = deferred<void>();

View file

@ -30,9 +30,9 @@ import {
import {
Deferred,
getReadableStreamResourceBacking,
readableStreamClose,
readableStreamForRid,
ReadableStreamPrototype,
resourceForReadableStream,
} from "ext:deno_web/06_streams.js";
import { listen, TcpConn } from "ext:deno_net/01_net.js";
import { listenTls } from "ext:deno_net/02_tls.js";
@ -41,10 +41,6 @@ const {
Error,
ObjectPrototypeIsPrototypeOf,
PromisePrototypeCatch,
SafeSet,
SafeSetIterator,
SetPrototypeAdd,
SetPrototypeDelete,
Symbol,
SymbolFor,
TypeError,
@ -61,7 +57,6 @@ const {
op_http_set_promise_complete,
op_http_set_response_body_bytes,
op_http_set_response_body_resource,
op_http_set_response_body_stream,
op_http_set_response_body_text,
op_http_set_response_header,
op_http_set_response_headers,
@ -339,7 +334,6 @@ class InnerRequest {
class CallbackContext {
abortController;
responseBodies;
scheme;
fallbackHost;
serverRid;
@ -352,7 +346,6 @@ class CallbackContext {
{ once: true },
);
this.abortController = new AbortController();
this.responseBodies = new SafeSet();
this.serverRid = args[0];
this.scheme = args[1];
this.fallbackHost = args[2];
@ -379,23 +372,24 @@ class ServeHandlerInfo {
}
}
function fastSyncResponseOrStream(req, respBody) {
function fastSyncResponseOrStream(req, respBody, status) {
if (respBody === null || respBody === undefined) {
// Don't set the body
return null;
op_http_set_promise_complete(req, status);
return;
}
const stream = respBody.streamOrStatic;
const body = stream.body;
if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) {
op_http_set_response_body_bytes(req, body);
return null;
op_http_set_response_body_bytes(req, body, status);
return;
}
if (typeof body === "string") {
op_http_set_response_body_text(req, body);
return null;
op_http_set_response_body_text(req, body, status);
return;
}
// At this point in the response it needs to be a stream
@ -408,115 +402,16 @@ function fastSyncResponseOrStream(req, respBody) {
req,
resourceBacking.rid,
resourceBacking.autoClose,
status,
);
return null;
}
return stream;
}
async function asyncResponse(responseBodies, req, status, stream) {
const reader = stream.getReader();
let responseRid;
let closed = false;
let timeout;
try {
// IMPORTANT: We get a performance boost from this optimization, but V8 is very
// sensitive to the order and structure. Benchmark any changes to this code.
// Optimize for streams that are done in zero or one packets. We will not
// have to allocate a resource in this case.
const { value: value1, done: done1 } = await reader.read();
if (done1) {
closed = true;
// Exit 1: no response body at all, extreme fast path
// Reader will be closed by finally block
return;
}
// The second value cannot block indefinitely, as someone may be waiting on a response
// of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms
// and we race it.
let timeoutPromise;
timeout = setTimeout(() => {
responseRid = op_http_set_response_body_stream(req);
SetPrototypeAdd(responseBodies, responseRid);
op_http_set_promise_complete(req, status);
// TODO(mmastrac): if this promise fails before we get to the await below, it crashes
// the process with an error:
//
// 'Uncaught (in promise) BadResource: failed to write'.
//
// To avoid this, we're going to swallow errors here and allow the code later in the
// file to re-throw them in a way that doesn't appear to be an uncaught promise rejection.
timeoutPromise = PromisePrototypeCatch(
core.writeAll(responseRid, value1),
() => null,
} else {
const rid = resourceForReadableStream(stream);
op_http_set_response_body_resource(
req,
rid,
true,
status,
);
}, 250);
const { value: value2, done: done2 } = await reader.read();
if (timeoutPromise) {
await timeoutPromise;
if (done2) {
closed = true;
// Exit 2(a): read 2 is EOS, and timeout resolved.
// Reader will be closed by finally block
// Response stream will be closed by finally block.
return;
}
// Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward.
} else {
clearTimeout(timeout);
timeout = undefined;
if (done2) {
// Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough.
// Reader will be closed by finally block
// No response stream
closed = true;
op_http_set_response_body_bytes(req, value1);
return;
}
responseRid = op_http_set_response_body_stream(req);
SetPrototypeAdd(responseBodies, responseRid);
op_http_set_promise_complete(req, status);
// Write our first packet
await core.writeAll(responseRid, value1);
}
await core.writeAll(responseRid, value2);
while (true) {
const { value, done } = await reader.read();
if (done) {
closed = true;
break;
}
await core.writeAll(responseRid, value);
}
} catch (error) {
closed = true;
try {
await reader.cancel(error);
} catch {
// Pass
}
} finally {
if (!closed) {
readableStreamClose(reader);
}
if (timeout !== undefined) {
clearTimeout(timeout);
}
if (responseRid) {
core.tryClose(responseRid);
SetPrototypeDelete(responseBodies, responseRid);
} else {
op_http_set_promise_complete(req, status);
}
}
}
@ -528,7 +423,6 @@ async function asyncResponse(responseBodies, req, status, stream) {
* This function returns a promise that will only reject in the case of abnormal exit.
*/
function mapToCallback(context, callback, onError) {
const responseBodies = context.responseBodies;
const signal = context.abortController.signal;
const hasCallback = callback.length > 0;
const hasOneCallback = callback.length === 1;
@ -591,15 +485,7 @@ function mapToCallback(context, callback, onError) {
}
}
// Attempt to respond quickly to this request, otherwise extract the stream
const stream = fastSyncResponseOrStream(req, inner.body);
if (stream !== null) {
// Handle the stream asynchronously
await asyncResponse(responseBodies, req, status, stream);
} else {
op_http_set_promise_complete(req, status);
}
fastSyncResponseOrStream(req, inner.body, status);
innerRequest?.close();
};
}
@ -755,10 +641,6 @@ function serveHttpOn(context, callback) {
}
PromisePrototypeCatch(callback(req), promiseErrorHandler);
}
for (const streamRid of new SafeSetIterator(context.responseBodies)) {
core.tryClose(streamRid);
}
})();
return {

View file

@ -10,7 +10,6 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
use crate::response_body::V8StreamHttpResponseBody;
use crate::slab::slab_drop;
use crate::slab::slab_get;
use crate::slab::slab_init;
@ -30,6 +29,7 @@ use deno_core::task::JoinHandle;
use deno_core::v8;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@ -573,6 +573,7 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
fn set_response(
slab_id: SlabId,
length: Option<usize>,
status: u16,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
let mut http = slab_get(slab_id);
@ -583,7 +584,14 @@ fn set_response(
length,
response.headers_mut(),
);
response.body_mut().initialize(response_fn(compression))
response.body_mut().initialize(response_fn(compression));
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
*response.status_mut() = code;
}
http.complete();
}
#[op2(fast)]
@ -592,6 +600,7 @@ pub fn op_http_set_response_body_resource(
#[smi] slab_id: SlabId,
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
) -> Result<(), AnyError> {
// If the stream is auto_close, we will hold the last ref to it until the response is complete.
let resource = if auto_close {
@ -603,6 +612,7 @@ pub fn op_http_set_response_body_resource(
set_response(
slab_id,
resource.size_hint().1.map(|s| s as usize),
status,
move |compression| {
ResponseBytesInner::from_resource(compression, resource, auto_close)
},
@ -611,43 +621,35 @@ pub fn op_http_set_response_body_resource(
Ok(())
}
#[op2(fast)]
#[smi]
pub fn op_http_set_response_body_stream(
state: &mut OpState,
#[smi] slab_id: SlabId,
) -> Result<ResourceId, AnyError> {
// TODO(mmastrac): what should this channel size be?
let (tx, rx) = tokio::sync::mpsc::channel(1);
set_response(slab_id, None, |compression| {
ResponseBytesInner::from_v8(compression, rx)
});
Ok(state.resource_table.add(V8StreamHttpResponseBody::new(tx)))
}
#[op2(fast)]
pub fn op_http_set_response_body_text(
#[smi] slab_id: SlabId,
#[string] text: String,
status: u16,
) {
if !text.is_empty() {
set_response(slab_id, Some(text.len()), |compression| {
set_response(slab_id, Some(text.len()), status, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
} else {
op_http_set_promise_complete::call(slab_id, status);
}
}
#[op2(fast)]
// Skipping `fast` because we prefer an owned buffer here.
#[op2]
pub fn op_http_set_response_body_bytes(
#[smi] slab_id: SlabId,
#[buffer] buffer: &[u8],
#[buffer] buffer: JsBuffer,
status: u16,
) {
if !buffer.is_empty() {
set_response(slab_id, Some(buffer.len()), |compression| {
ResponseBytesInner::from_slice(compression, buffer)
set_response(slab_id, Some(buffer.len()), status, |compression| {
ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
});
};
} else {
op_http_set_promise_complete::call(slab_id, status);
}
}
#[op2(async)]

View file

@ -115,7 +115,6 @@ deno_core::extension!(
http_next::op_http_set_promise_complete,
http_next::op_http_set_response_body_bytes,
http_next::op_http_set_response_body_resource,
http_next::op_http_set_response_body_stream,
http_next::op_http_set_response_body_text,
http_next::op_http_set_response_header,
http_next::op_http_set_response_headers,

View file

@ -1,5 +1,4 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::borrow::Cow;
use std::cell::RefCell;
use std::future::Future;
use std::io::Write;
@ -11,18 +10,12 @@ use brotli::enc::encode::BrotliEncoderParameter;
use brotli::ffi::compressor::BrotliEncoderState;
use bytes::Bytes;
use bytes::BytesMut;
use deno_core::error::bad_resource;
use deno_core::error::AnyError;
use deno_core::futures::ready;
use deno_core::futures::FutureExt;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::WriteOutcome;
use flate2::write::GzEncoder;
use http::HeaderMap;
use hyper1::body::Body;
@ -126,8 +119,8 @@ pub enum Compression {
pub enum ResponseStream {
/// A resource stream, piped in fast mode.
Resource(ResourceBodyAdapter),
/// A JS-backed stream, written in JS and transported via pipe.
V8Stream(tokio::sync::mpsc::Receiver<BufView>),
#[cfg(test)]
TestChannel(tokio::sync::mpsc::Receiver<BufView>),
}
#[derive(Default)]
@ -217,13 +210,6 @@ impl ResponseBytesInner {
}
}
pub fn from_v8(
compression: Compression,
rx: tokio::sync::mpsc::Receiver<BufView>,
) -> Self {
Self::from_stream(compression, ResponseStream::V8Stream(rx))
}
pub fn from_resource(
compression: Compression,
stm: Rc<dyn Resource>,
@ -235,12 +221,12 @@ impl ResponseBytesInner {
)
}
pub fn from_slice(compression: Compression, bytes: &[u8]) -> Self {
pub fn from_bufview(compression: Compression, buf: BufView) -> Self {
match compression {
Compression::GZip => {
let mut writer =
GzEncoder::new(Vec::new(), flate2::Compression::fast());
writer.write_all(bytes).unwrap();
writer.write_all(&buf).unwrap();
Self::Bytes(BufView::from(writer.finish().unwrap()))
}
Compression::Brotli => {
@ -251,11 +237,11 @@ impl ResponseBytesInner {
// (~4MB)
let mut writer =
brotli::CompressorWriter::new(Vec::new(), 65 * 1024, 6, 22);
writer.write_all(bytes).unwrap();
writer.write_all(&buf).unwrap();
writer.flush().unwrap();
Self::Bytes(BufView::from(writer.into_inner()))
}
_ => Self::Bytes(BufView::from(bytes.to_vec())),
_ => Self::Bytes(buf),
}
}
@ -368,14 +354,16 @@ impl PollFrame for ResponseStream {
) -> std::task::Poll<ResponseStreamResult> {
match &mut *self {
ResponseStream::Resource(res) => Pin::new(res).poll_frame(cx),
ResponseStream::V8Stream(res) => Pin::new(res).poll_frame(cx),
#[cfg(test)]
ResponseStream::TestChannel(rx) => Pin::new(rx).poll_frame(cx),
}
}
fn size_hint(&self) -> SizeHint {
match self {
ResponseStream::Resource(res) => res.size_hint(),
ResponseStream::V8Stream(res) => res.size_hint(),
#[cfg(test)]
ResponseStream::TestChannel(_) => SizeHint::default(),
}
}
}
@ -414,6 +402,7 @@ impl PollFrame for ResourceBodyAdapter {
}
}
#[cfg(test)]
impl PollFrame for tokio::sync::mpsc::Receiver<BufView> {
fn poll_frame(
mut self: Pin<&mut Self>,
@ -761,52 +750,6 @@ impl PollFrame for BrotliResponseStream {
}
}
/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which
/// feed's hyper's HTTP response.
pub struct V8StreamHttpResponseBody(
AsyncRefCell<Option<tokio::sync::mpsc::Sender<BufView>>>,
CancelHandle,
);
impl V8StreamHttpResponseBody {
pub fn new(sender: tokio::sync::mpsc::Sender<BufView>) -> Self {
Self(AsyncRefCell::new(Some(sender)), CancelHandle::default())
}
}
impl Resource for V8StreamHttpResponseBody {
fn name(&self) -> Cow<str> {
"responseBody".into()
}
fn write(
self: Rc<Self>,
buf: BufView,
) -> AsyncResult<deno_core::WriteOutcome> {
let cancel_handle = RcRef::map(&self, |this| &this.1);
Box::pin(
async move {
let nwritten = buf.len();
let res = RcRef::map(self, |this| &this.0).borrow().await;
if let Some(tx) = res.as_ref() {
tx.send(buf)
.await
.map_err(|_| bad_resource("failed to write"))?;
Ok(WriteOutcome::Full { nwritten })
} else {
Err(bad_resource("failed to write"))
}
}
.try_or_cancel(cancel_handle),
)
}
fn close(self: Rc<Self>) {
self.1.cancel();
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -892,7 +835,7 @@ mod tests {
expected.extend(v);
}
let (tx, rx) = tokio::sync::mpsc::channel(1);
let underlying = ResponseStream::V8Stream(rx);
let underlying = ResponseStream::TestChannel(rx);
let mut resp = GZipResponseStream::new(underlying);
let handle = tokio::task::spawn(async move {
for chunk in v {
@ -934,7 +877,7 @@ mod tests {
expected.extend(v);
}
let (tx, rx) = tokio::sync::mpsc::channel(1);
let underlying = ResponseStream::V8Stream(rx);
let underlying = ResponseStream::TestChannel(rx);
let mut resp = BrotliResponseStream::new(underlying);
let handle = tokio::task::spawn(async move {
for chunk in v {

View file

@ -1,4 +1,5 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// deno-lint-ignore-file camelcase
// @ts-check
/// <reference path="../webidl/internal.d.ts" />
@ -7,7 +8,17 @@
/// <reference lib="esnext" />
const core = globalThis.Deno.core;
const ops = core.ops;
const internals = globalThis.__bootstrap.internals;
const {
op_arraybuffer_was_detached,
op_transfer_arraybuffer,
op_readable_stream_resource_allocate,
op_readable_stream_resource_get_sink,
op_readable_stream_resource_write_error,
op_readable_stream_resource_write_buf,
op_readable_stream_resource_close,
op_readable_stream_resource_await_close,
} = core.ensureFastOps();
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { structuredClone } from "ext:deno_web/02_structured_clone.js";
import {
@ -61,6 +72,7 @@ const {
SafeWeakMap,
// TODO(lucacasonato): add SharedArrayBuffer to primordials
// SharedArrayBufferPrototype,
String,
Symbol,
SymbolAsyncIterator,
SymbolIterator,
@ -218,7 +230,7 @@ function isDetachedBuffer(O) {
return false;
}
return ArrayBufferPrototypeGetByteLength(O) === 0 &&
ops.op_arraybuffer_was_detached(O);
op_arraybuffer_was_detached(O);
}
/**
@ -244,7 +256,7 @@ function canTransferArrayBuffer(O) {
* @returns {ArrayBufferLike}
*/
function transferArrayBuffer(O) {
return ops.op_transfer_arraybuffer(O);
return op_transfer_arraybuffer(O);
}
/**
@ -695,6 +707,68 @@ function isReadableStreamDisturbed(stream) {
return stream[_disturbed];
}
/**
* Create a new resource that wraps a ReadableStream. The resource will support
* read operations, and those read operations will be fed by the output of the
* ReadableStream source.
* @param {ReadableStream<Uint8Array>} stream
* @returns {number}
*/
function resourceForReadableStream(stream) {
const reader = acquireReadableStreamDefaultReader(stream);
// Allocate the resource
const rid = op_readable_stream_resource_allocate();
// Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors
PromisePrototypeCatch(
PromisePrototypeThen(
op_readable_stream_resource_await_close(rid),
() => reader.cancel(),
),
() => {},
);
// The ops here look like op_write_all/op_close, but we're not actually writing to a
// real resource.
(async () => {
try {
// This allocation is freed in the finally block below, guaranteeing it won't leak
const sink = op_readable_stream_resource_get_sink(rid);
try {
while (true) {
let value;
try {
const read = await reader.read();
value = read.value;
if (read.done) {
break;
}
} catch (err) {
const message = err.message;
if (message) {
await op_readable_stream_resource_write_error(sink, err.message);
} else {
await op_readable_stream_resource_write_error(sink, String(err));
}
break;
}
// If the chunk has non-zero length, write it
if (value.length > 0) {
await op_readable_stream_resource_write_buf(sink, value);
}
}
} finally {
op_readable_stream_resource_close(sink);
}
} catch (err) {
// Something went terribly wrong with this stream -- log and continue
console.error("Unexpected internal error on stream", err);
}
})();
return rid;
}
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
// A finalization registry to clean up underlying resources that are GC'ed.
@ -6454,6 +6528,8 @@ webidl.converters.StreamPipeOptions = webidl
{ key: "signal", converter: webidl.converters.AbortSignal },
]);
internals.resourceForReadableStream = resourceForReadableStream;
export {
// Non-Public
_state,
@ -6482,6 +6558,7 @@ export {
ReadableStreamPrototype,
readableStreamTee,
readableStreamThrowIfErrored,
resourceForReadableStream,
TransformStream,
TransformStreamDefaultController,
WritableStream,

View file

@ -16,9 +16,11 @@ path = "lib.rs"
[dependencies]
async-trait.workspace = true
base64-simd = "0.8"
bytes.workspace = true
deno_core.workspace = true
encoding_rs.workspace = true
flate2.workspace = true
futures.workspace = true
serde = "1.0.149"
tokio.workspace = true
uuid = { workspace = true, features = ["serde"] }

View file

@ -4,6 +4,7 @@ mod blob;
mod compression;
mod hr_timer_lock;
mod message_port;
mod stream_resource;
mod timers;
use deno_core::error::range_error;
@ -90,6 +91,12 @@ deno_core::extension!(deno_web,
op_cancel_handle,
op_sleep,
op_transfer_arraybuffer,
stream_resource::op_readable_stream_resource_allocate,
stream_resource::op_readable_stream_resource_get_sink,
stream_resource::op_readable_stream_resource_write_error,
stream_resource::op_readable_stream_resource_write_buf,
stream_resource::op_readable_stream_resource_close,
stream_resource::op_readable_stream_resource_await_close,
],
esm = [
"00_infra.js",

274
ext/web/stream_resource.rs Normal file
View file

@ -0,0 +1,274 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use deno_core::anyhow::Error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcLike;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use futures::stream::Peekable;
use futures::Stream;
use futures::StreamExt;
use std::borrow::Cow;
use std::cell::RefCell;
use std::ffi::c_void;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
type SenderCell = RefCell<Option<Sender<Result<BufView, Error>>>>;
// This indirection allows us to more easily integrate the fast streams work at a later date
#[repr(transparent)]
struct ChannelStreamAdapter<C>(C);
impl<C> Stream for ChannelStreamAdapter<C>
where
C: ChannelBytesRead,
{
type Item = Result<BufView, AnyError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.0.poll_recv(cx)
}
}
pub trait ChannelBytesRead: Unpin + 'static {
fn poll_recv(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<BufView, AnyError>>>;
}
impl ChannelBytesRead for tokio::sync::mpsc::Receiver<Result<BufView, Error>> {
fn poll_recv(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<BufView, AnyError>>> {
self.poll_recv(cx)
}
}
#[allow(clippy::type_complexity)]
struct ReadableStreamResource {
reader: AsyncRefCell<
Peekable<ChannelStreamAdapter<Receiver<Result<BufView, Error>>>>,
>,
cancel_handle: CancelHandle,
data: ReadableStreamResourceData,
}
impl ReadableStreamResource {
pub fn cancel_handle(self: &Rc<Self>) -> impl RcLike<CancelHandle> {
RcRef::map(self, |s| &s.cancel_handle).clone()
}
async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> {
let cancel_handle = self.cancel_handle();
let peekable = RcRef::map(self, |this| &this.reader);
let mut peekable = peekable.borrow_mut().await;
match Pin::new(&mut *peekable)
.peek_mut()
.or_cancel(cancel_handle)
.await?
{
None => Ok(BufView::empty()),
// Take the actual error since we only have a reference to it
Some(Err(_)) => Err(peekable.next().await.unwrap().err().unwrap()),
Some(Ok(bytes)) => {
if bytes.len() <= limit {
// We can safely take the next item since we peeked it
return peekable.next().await.unwrap();
}
// The remainder of the bytes after we split it is still left in the peek buffer
let ret = bytes.split_to(limit);
Ok(ret)
}
}
}
}
impl Resource for ReadableStreamResource {
fn name(&self) -> Cow<str> {
Cow::Borrowed("readableStream")
}
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(ReadableStreamResource::read(self, limit))
}
}
// TODO(mmastrac): Move this to deno_core
#[derive(Clone, Debug, Default)]
pub struct CompletionHandle {
inner: Rc<RefCell<CompletionHandleInner>>,
}
#[derive(Debug, Default)]
struct CompletionHandleInner {
complete: bool,
success: bool,
waker: Option<Waker>,
}
impl CompletionHandle {
pub fn complete(&self, success: bool) {
let mut mut_self = self.inner.borrow_mut();
mut_self.complete = true;
mut_self.success = success;
if let Some(waker) = mut_self.waker.take() {
drop(mut_self);
waker.wake();
}
}
}
impl Future for CompletionHandle {
type Output = bool;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut mut_self = self.inner.borrow_mut();
if mut_self.complete {
return std::task::Poll::Ready(mut_self.success);
}
mut_self.waker = Some(cx.waker().clone());
std::task::Poll::Pending
}
}
fn sender_closed() -> Error {
type_error("sender closed")
}
/// Allocate a resource that wraps a ReadableStream.
#[op2(fast)]
#[smi]
pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let tx = RefCell::new(Some(tx));
let completion = CompletionHandle::default();
let tx = Box::new(tx);
let resource = ReadableStreamResource {
cancel_handle: Default::default(),
reader: AsyncRefCell::new(ChannelStreamAdapter(rx).peekable()),
data: ReadableStreamResourceData {
tx: Box::into_raw(tx),
completion,
},
};
state.resource_table.add(resource)
}
#[op2(fast)]
pub fn op_readable_stream_resource_get_sink(
state: &mut OpState,
#[smi] rid: ResourceId,
) -> *const c_void {
let Ok(resource) = state.resource_table.get::<ReadableStreamResource>(rid) else {
return std::ptr::null();
};
resource.data.tx as _
}
fn get_sender(sender: *const c_void) -> Option<Sender<Result<BufView, Error>>> {
// SAFETY: We know this is a valid v8::External
unsafe {
(sender as *const SenderCell)
.as_ref()
.and_then(|r| r.borrow_mut().as_ref().cloned())
}
}
fn drop_sender(sender: *const c_void) {
// SAFETY: We know this is a valid v8::External
unsafe {
assert!(!sender.is_null());
_ = Box::from_raw(sender as *mut SenderCell);
}
}
#[op2(async)]
pub fn op_readable_stream_resource_write_buf(
sender: *const c_void,
#[buffer] buffer: JsBuffer,
) -> impl Future<Output = Result<(), Error>> {
let sender = get_sender(sender);
async move {
let sender = sender.ok_or_else(sender_closed)?;
sender
.send(Ok(buffer.into()))
.await
.map_err(|_| sender_closed())?;
Ok(())
}
}
#[op2(async)]
pub fn op_readable_stream_resource_write_error(
sender: *const c_void,
#[string] error: String,
) -> impl Future<Output = Result<(), Error>> {
let sender = get_sender(sender);
async move {
let sender = sender.ok_or_else(sender_closed)?;
sender
.send(Err(type_error(Cow::Owned(error))))
.await
.map_err(|_| sender_closed())?;
Ok(())
}
}
#[op2(fast)]
#[smi]
pub fn op_readable_stream_resource_close(sender: *const c_void) {
drop_sender(sender);
}
#[op2(async)]
pub fn op_readable_stream_resource_await_close(
state: &mut OpState,
#[smi] rid: ResourceId,
) -> impl Future<Output = ()> {
let completion = state
.resource_table
.get::<ReadableStreamResource>(rid)
.ok()
.map(|r| r.data.completion.clone());
async move {
if let Some(completion) = completion {
completion.await;
}
}
}
struct ReadableStreamResourceData {
tx: *const SenderCell,
completion: CompletionHandle,
}
impl Drop for ReadableStreamResourceData {
fn drop(&mut self) {
self.completion.complete(true);
}
}