1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

feat(ext/web): use readableStreamDefaultReaderRead in resourceForReadableStream (#20622)

We can go one level down in abstraction and avoid using the public
`ReadableStream` APIs.

This patch ~5% perf boost on small ReadableStream:

```
Running 10s test @ http://localhost:8080/
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   148.32us  108.95us   3.88ms   95.71%
    Req/Sec    33.24k     2.68k   37.94k    73.76%
  668188 requests in 10.10s, 77.74MB read
Requests/sec:  66162.91
Transfer/sec:      7.70MB
```

main:

```
Running 10s test @ http://localhost:8080/
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   150.23us   67.61us   4.39ms   94.80%
    Req/Sec    31.81k     1.55k   35.56k    83.17%
  639078 requests in 10.10s, 74.36MB read
Requests/sec:  63273.72
Transfer/sec:      7.36MB
```
This commit is contained in:
Matt Mastracci 2023-09-23 08:55:28 -06:00 committed by GitHub
parent b1ca67ac01
commit 06297d952d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 212 additions and 53 deletions

View file

@ -60,6 +60,28 @@ function longStream() {
}).pipeThrough(new TextEncoderStream());
}
// Long stream with Lorem Ipsum text.
// deno-lint-ignore no-explicit-any
function longAsyncStream(completion?: Deferred<any>) {
let currentTimeout: number | undefined = undefined;
return new ReadableStream({
async start(controller) {
for (let i = 0; i < 100; i++) {
await new Promise((r) => currentTimeout = setTimeout(r, 1));
currentTimeout = undefined;
controller.enqueue(LOREM);
}
controller.close();
},
cancel(reason) {
completion?.resolve(reason);
if (currentTimeout !== undefined) {
clearTimeout(currentTimeout);
}
},
}).pipeThrough(new TextEncoderStream());
}
// Empty stream, closes either immediately or on a call to pull.
function emptyStream(onPull: boolean) {
return new ReadableStream({
@ -104,6 +126,20 @@ function emptyChunkStream() {
});
}
// Try to blow up any recursive reads. Note that because of the use of Array.shift in
// ReadableStream, this might not actually be able to complete with larger values of
// length.
function veryLongTinyPacketStream(length: number) {
return new ReadableStream({
start(controller) {
for (let i = 0; i < length; i++) {
controller.enqueue(new Uint8Array([1]));
}
controller.close();
},
});
}
// Creates a stream with the given number of packets, a configurable delay between packets, and a final
// action (either "Throw" or "Close").
function makeStreamWithCount(
@ -179,6 +215,14 @@ Deno.test(async function readableStreamCloseWithoutRead() {
assertEquals(await cancel, "resource closed");
});
// Close the stream without reading anything
Deno.test(async function readableStreamCloseWithoutRead2() {
const cancel = deferred();
const rid = resourceForReadableStream(longAsyncStream(cancel));
core.ops.op_close(rid);
assertEquals(await cancel, "resource closed");
});
Deno.test(async function readableStreamPartial() {
const rid = resourceForReadableStream(helloWorldStream());
const buffer = new Uint8Array(5);
@ -197,6 +241,20 @@ Deno.test(async function readableStreamLongReadAll() {
core.ops.op_close(rid);
});
Deno.test(async function readableStreamLongAsyncReadAll() {
const rid = resourceForReadableStream(longAsyncStream());
const buffer = await core.ops.op_read_all(rid);
assertEquals(buffer.length, LOREM.length * 100);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamVeryLongReadAll() {
const rid = resourceForReadableStream(veryLongTinyPacketStream(10000));
const buffer = await core.ops.op_read_all(rid);
assertEquals(buffer.length, 10000);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamLongByPiece() {
const rid = resourceForReadableStream(longStream());
let total = 0;

View file

@ -15,6 +15,7 @@ const {
op_readable_stream_resource_get_sink,
op_readable_stream_resource_write_error,
op_readable_stream_resource_write_buf,
op_readable_stream_resource_write_sync,
op_readable_stream_resource_close,
op_readable_stream_resource_await_close,
} = core.ensureFastOps();
@ -704,6 +705,121 @@ function isReadableStreamDisturbed(stream) {
return stream[_disturbed];
}
/**
* @param {Error | string | undefined} error
* @returns {string}
*/
function extractStringErrorFromError(error) {
if (typeof error == "string") {
return error;
}
const message = error?.message;
const stringMessage = typeof message == "string" ? message : String(message);
return stringMessage;
}
// We don't want to leak resources associated with our sink, even if something bad happens
const READABLE_STREAM_SOURCE_REGISTRY = new SafeFinalizationRegistry(
(external) => {
op_readable_stream_resource_close(external);
},
);
class ResourceStreamResourceSink {
external;
constructor(external) {
this.external = external;
READABLE_STREAM_SOURCE_REGISTRY.register(this, external, this);
}
close() {
if (this.external === undefined) {
return;
}
READABLE_STREAM_SOURCE_REGISTRY.unregister(this);
op_readable_stream_resource_close(this.external);
this.external = undefined;
}
}
/**
* @param {ReadableStreamDefaultReader<Uint8Array>} reader
* @param {any} sink
* @param {Uint8Array} chunk
*/
function readableStreamWriteChunkFn(reader, sink, chunk) {
// Empty chunk. Re-read.
if (chunk.length == 0) {
readableStreamReadFn(reader, sink);
return;
}
const res = op_readable_stream_resource_write_sync(sink.external, chunk);
if (res == 0) {
// Closed
reader.cancel("resource closed");
sink.close();
} else if (res == 1) {
// Successfully written (synchronous). Re-read.
readableStreamReadFn(reader, sink);
} else if (res == 2) {
// Full. If the channel is full, we perform an async await until we can write, and then return
// to a synchronous loop.
(async () => {
if (
await op_readable_stream_resource_write_buf(
sink.external,
chunk,
)
) {
readableStreamReadFn(reader, sink);
} else {
reader.cancel("resource closed");
sink.close();
}
})();
}
}
/**
* @param {ReadableStreamDefaultReader<Uint8Array>} reader
* @param {any} sink
*/
function readableStreamReadFn(reader, sink) {
// The ops here look like op_write_all/op_close, but we're not actually writing to a
// real resource.
let reentrant = true;
let gotChunk = undefined;
readableStreamDefaultReaderRead(reader, {
chunkSteps(chunk) {
// If the chunk has non-zero length, write it
if (reentrant) {
gotChunk = chunk;
} else {
readableStreamWriteChunkFn(reader, sink, chunk);
}
},
closeSteps() {
sink.close();
},
errorSteps(error) {
const success = op_readable_stream_resource_write_error(
sink.external,
extractStringErrorFromError(error),
);
// We don't cancel the reader if there was an error reading. We'll let the downstream
// consumer close the resource after it receives the error.
if (!success) {
reader.cancel("resource closed");
}
sink.close();
},
});
reentrant = false;
if (gotChunk) {
readableStreamWriteChunkFn(reader, sink, gotChunk);
}
}
/**
* Create a new resource that wraps a ReadableStream. The resource will support
* read operations, and those read operations will be fed by the output of the
@ -726,51 +842,14 @@ function resourceForReadableStream(stream) {
() => {},
);
// The ops here look like op_write_all/op_close, but we're not actually writing to a
// real resource.
(async () => {
try {
// This allocation is freed in the finally block below, guaranteeing it won't leak
const sink = op_readable_stream_resource_get_sink(rid);
try {
while (true) {
let value;
try {
const read = await reader.read();
value = read.value;
if (read.done) {
break;
}
} catch (err) {
const message = err?.message;
const success = (message && (typeof message == "string"))
? await op_readable_stream_resource_write_error(sink, message)
: await op_readable_stream_resource_write_error(
sink,
String(err),
// This allocation is freed when readableStreamReadFn is completed
const sink = new ResourceStreamResourceSink(
op_readable_stream_resource_get_sink(rid),
);
// We don't cancel the reader if there was an error reading. We'll let the downstream
// consumer close the resource after it receives the error.
if (!success) {
reader.cancel("resource closed");
}
break;
}
// If the chunk has non-zero length, write it
if (value.length > 0) {
if (!await op_readable_stream_resource_write_buf(sink, value)) {
reader.cancel("resource closed");
}
}
}
} finally {
op_readable_stream_resource_close(sink);
}
} catch (err) {
// Something went terribly wrong with this stream -- log and continue
console.error("Unexpected internal error on stream", err);
}
})();
// Trigger the first read
readableStreamReadFn(reader, sink);
return rid;
}

View file

@ -95,6 +95,7 @@ deno_core::extension!(deno_web,
stream_resource::op_readable_stream_resource_get_sink,
stream_resource::op_readable_stream_resource_write_error,
stream_resource::op_readable_stream_resource_write_buf,
stream_resource::op_readable_stream_resource_write_sync,
stream_resource::op_readable_stream_resource_close,
stream_resource::op_readable_stream_resource_await_close,
],

View file

@ -313,6 +313,10 @@ impl BoundedBufferChannel {
self.inner().write_error(error)
}
pub fn can_write(&self) -> bool {
self.inner().can_write()
}
pub fn poll_read_ready(&self, cx: &mut Context) -> Poll<()> {
self.inner().poll_read_ready(cx)
}
@ -471,18 +475,35 @@ pub fn op_readable_stream_resource_write_buf(
}
}
#[op2(async)]
/// Write to the channel synchronously, returning 0 if the channel was closed, 1 if we wrote
/// successfully, 2 if the channel was full and we need to block.
#[op2]
pub fn op_readable_stream_resource_write_sync(
sender: *const c_void,
#[buffer] buffer: JsBuffer,
) -> u32 {
let sender = get_sender(sender);
if sender.can_write() {
if sender.closed() {
0
} else {
sender.write(buffer.into_parts()).unwrap();
1
}
} else {
2
}
}
#[op2(fast)]
pub fn op_readable_stream_resource_write_error(
sender: *const c_void,
#[string] error: String,
) -> impl Future<Output = bool> {
) -> bool {
let sender = get_sender(sender);
async move {
// We can always write an error, no polling required
// TODO(mmastrac): we can remove async from this method
sender.write_error(type_error(Cow::Owned(error)));
!sender.closed()
}
}
#[op2(fast)]