diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts index 3460e8ecd3..e496b5c4f3 100644 --- a/cli/tests/unit/streams_test.ts +++ b/cli/tests/unit/streams_test.ts @@ -60,6 +60,28 @@ function longStream() { }).pipeThrough(new TextEncoderStream()); } +// Long stream with Lorem Ipsum text. +// deno-lint-ignore no-explicit-any +function longAsyncStream(completion?: Deferred) { + let currentTimeout: number | undefined = undefined; + return new ReadableStream({ + async start(controller) { + for (let i = 0; i < 100; i++) { + await new Promise((r) => currentTimeout = setTimeout(r, 1)); + currentTimeout = undefined; + controller.enqueue(LOREM); + } + controller.close(); + }, + cancel(reason) { + completion?.resolve(reason); + if (currentTimeout !== undefined) { + clearTimeout(currentTimeout); + } + }, + }).pipeThrough(new TextEncoderStream()); +} + // Empty stream, closes either immediately or on a call to pull. function emptyStream(onPull: boolean) { return new ReadableStream({ @@ -104,6 +126,20 @@ function emptyChunkStream() { }); } +// Try to blow up any recursive reads. Note that because of the use of Array.shift in +// ReadableStream, this might not actually be able to complete with larger values of +// length. +function veryLongTinyPacketStream(length: number) { + return new ReadableStream({ + start(controller) { + for (let i = 0; i < length; i++) { + controller.enqueue(new Uint8Array([1])); + } + controller.close(); + }, + }); +} + // Creates a stream with the given number of packets, a configurable delay between packets, and a final // action (either "Throw" or "Close"). function makeStreamWithCount( @@ -179,6 +215,14 @@ Deno.test(async function readableStreamCloseWithoutRead() { assertEquals(await cancel, "resource closed"); }); +// Close the stream without reading anything +Deno.test(async function readableStreamCloseWithoutRead2() { + const cancel = deferred(); + const rid = resourceForReadableStream(longAsyncStream(cancel)); + core.ops.op_close(rid); + assertEquals(await cancel, "resource closed"); +}); + Deno.test(async function readableStreamPartial() { const rid = resourceForReadableStream(helloWorldStream()); const buffer = new Uint8Array(5); @@ -197,6 +241,20 @@ Deno.test(async function readableStreamLongReadAll() { core.ops.op_close(rid); }); +Deno.test(async function readableStreamLongAsyncReadAll() { + const rid = resourceForReadableStream(longAsyncStream()); + const buffer = await core.ops.op_read_all(rid); + assertEquals(buffer.length, LOREM.length * 100); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamVeryLongReadAll() { + const rid = resourceForReadableStream(veryLongTinyPacketStream(10000)); + const buffer = await core.ops.op_read_all(rid); + assertEquals(buffer.length, 10000); + core.ops.op_close(rid); +}); + Deno.test(async function readableStreamLongByPiece() { const rid = resourceForReadableStream(longStream()); let total = 0; diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 6d2a552320..7f43d3fc27 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -15,6 +15,7 @@ const { op_readable_stream_resource_get_sink, op_readable_stream_resource_write_error, op_readable_stream_resource_write_buf, + op_readable_stream_resource_write_sync, op_readable_stream_resource_close, op_readable_stream_resource_await_close, } = core.ensureFastOps(); @@ -704,6 +705,121 @@ function isReadableStreamDisturbed(stream) { return stream[_disturbed]; } +/** + * @param {Error | string | undefined} error + * @returns {string} + */ +function extractStringErrorFromError(error) { + if (typeof error == "string") { + return error; + } + const message = error?.message; + const stringMessage = typeof message == "string" ? message : String(message); + return stringMessage; +} + +// We don't want to leak resources associated with our sink, even if something bad happens +const READABLE_STREAM_SOURCE_REGISTRY = new SafeFinalizationRegistry( + (external) => { + op_readable_stream_resource_close(external); + }, +); + +class ResourceStreamResourceSink { + external; + constructor(external) { + this.external = external; + READABLE_STREAM_SOURCE_REGISTRY.register(this, external, this); + } + close() { + if (this.external === undefined) { + return; + } + READABLE_STREAM_SOURCE_REGISTRY.unregister(this); + op_readable_stream_resource_close(this.external); + this.external = undefined; + } +} + +/** + * @param {ReadableStreamDefaultReader} reader + * @param {any} sink + * @param {Uint8Array} chunk + */ +function readableStreamWriteChunkFn(reader, sink, chunk) { + // Empty chunk. Re-read. + if (chunk.length == 0) { + readableStreamReadFn(reader, sink); + return; + } + + const res = op_readable_stream_resource_write_sync(sink.external, chunk); + if (res == 0) { + // Closed + reader.cancel("resource closed"); + sink.close(); + } else if (res == 1) { + // Successfully written (synchronous). Re-read. + readableStreamReadFn(reader, sink); + } else if (res == 2) { + // Full. If the channel is full, we perform an async await until we can write, and then return + // to a synchronous loop. + (async () => { + if ( + await op_readable_stream_resource_write_buf( + sink.external, + chunk, + ) + ) { + readableStreamReadFn(reader, sink); + } else { + reader.cancel("resource closed"); + sink.close(); + } + })(); + } +} + +/** + * @param {ReadableStreamDefaultReader} reader + * @param {any} sink + */ +function readableStreamReadFn(reader, sink) { + // The ops here look like op_write_all/op_close, but we're not actually writing to a + // real resource. + let reentrant = true; + let gotChunk = undefined; + readableStreamDefaultReaderRead(reader, { + chunkSteps(chunk) { + // If the chunk has non-zero length, write it + if (reentrant) { + gotChunk = chunk; + } else { + readableStreamWriteChunkFn(reader, sink, chunk); + } + }, + closeSteps() { + sink.close(); + }, + errorSteps(error) { + const success = op_readable_stream_resource_write_error( + sink.external, + extractStringErrorFromError(error), + ); + // We don't cancel the reader if there was an error reading. We'll let the downstream + // consumer close the resource after it receives the error. + if (!success) { + reader.cancel("resource closed"); + } + sink.close(); + }, + }); + reentrant = false; + if (gotChunk) { + readableStreamWriteChunkFn(reader, sink, gotChunk); + } +} + /** * Create a new resource that wraps a ReadableStream. The resource will support * read operations, and those read operations will be fed by the output of the @@ -726,51 +842,14 @@ function resourceForReadableStream(stream) { () => {}, ); - // The ops here look like op_write_all/op_close, but we're not actually writing to a - // real resource. - (async () => { - try { - // This allocation is freed in the finally block below, guaranteeing it won't leak - const sink = op_readable_stream_resource_get_sink(rid); - try { - while (true) { - let value; - try { - const read = await reader.read(); - value = read.value; - if (read.done) { - break; - } - } catch (err) { - const message = err?.message; - const success = (message && (typeof message == "string")) - ? await op_readable_stream_resource_write_error(sink, message) - : await op_readable_stream_resource_write_error( - sink, - String(err), - ); - // We don't cancel the reader if there was an error reading. We'll let the downstream - // consumer close the resource after it receives the error. - if (!success) { - reader.cancel("resource closed"); - } - break; - } - // If the chunk has non-zero length, write it - if (value.length > 0) { - if (!await op_readable_stream_resource_write_buf(sink, value)) { - reader.cancel("resource closed"); - } - } - } - } finally { - op_readable_stream_resource_close(sink); - } - } catch (err) { - // Something went terribly wrong with this stream -- log and continue - console.error("Unexpected internal error on stream", err); - } - })(); + // This allocation is freed when readableStreamReadFn is completed + const sink = new ResourceStreamResourceSink( + op_readable_stream_resource_get_sink(rid), + ); + + // Trigger the first read + readableStreamReadFn(reader, sink); + return rid; } diff --git a/ext/web/lib.rs b/ext/web/lib.rs index 88937efb2c..ebdb6b39e1 100644 --- a/ext/web/lib.rs +++ b/ext/web/lib.rs @@ -95,6 +95,7 @@ deno_core::extension!(deno_web, stream_resource::op_readable_stream_resource_get_sink, stream_resource::op_readable_stream_resource_write_error, stream_resource::op_readable_stream_resource_write_buf, + stream_resource::op_readable_stream_resource_write_sync, stream_resource::op_readable_stream_resource_close, stream_resource::op_readable_stream_resource_await_close, ], diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs index 1ee6ff963a..e19954fdca 100644 --- a/ext/web/stream_resource.rs +++ b/ext/web/stream_resource.rs @@ -313,6 +313,10 @@ impl BoundedBufferChannel { self.inner().write_error(error) } + pub fn can_write(&self) -> bool { + self.inner().can_write() + } + pub fn poll_read_ready(&self, cx: &mut Context) -> Poll<()> { self.inner().poll_read_ready(cx) } @@ -471,18 +475,35 @@ pub fn op_readable_stream_resource_write_buf( } } -#[op2(async)] +/// Write to the channel synchronously, returning 0 if the channel was closed, 1 if we wrote +/// successfully, 2 if the channel was full and we need to block. +#[op2] +pub fn op_readable_stream_resource_write_sync( + sender: *const c_void, + #[buffer] buffer: JsBuffer, +) -> u32 { + let sender = get_sender(sender); + if sender.can_write() { + if sender.closed() { + 0 + } else { + sender.write(buffer.into_parts()).unwrap(); + 1 + } + } else { + 2 + } +} + +#[op2(fast)] pub fn op_readable_stream_resource_write_error( sender: *const c_void, #[string] error: String, -) -> impl Future { +) -> bool { let sender = get_sender(sender); - async move { - // We can always write an error, no polling required - // TODO(mmastrac): we can remove async from this method - sender.write_error(type_error(Cow::Owned(error))); - !sender.closed() - } + // We can always write an error, no polling required + sender.write_error(type_error(Cow::Owned(error))); + !sender.closed() } #[op2(fast)]