1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-01 03:54:06 -05:00
denoland-deno/cli/tests/unit/streams_test.ts

479 lines
14 KiB
TypeScript
Raw Normal View History

// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { assertEquals, fail } from "./test_util.ts";
const {
core,
resourceForReadableStream,
// @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol
} = Deno[Deno.internal];
const LOREM =
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.";
// Hello world, with optional close
function helloWorldStream(
close?: boolean,
cancelResolve?: (value: unknown) => void,
) {
return new ReadableStream({
start(controller) {
controller.enqueue("hello, world");
if (close == true) {
controller.close();
}
},
cancel(reason) {
if (cancelResolve != undefined) {
cancelResolve(reason);
}
},
}).pipeThrough(new TextEncoderStream());
}
// Hello world, with optional close
function errorStream(type: "string" | "controller" | "TypeError") {
return new ReadableStream({
start(controller) {
controller.enqueue("hello, world");
},
pull(controller) {
if (type == "string") {
throw "Uh oh (string)!";
}
if (type == "TypeError") {
throw TypeError("Uh oh (TypeError)!");
}
controller.error("Uh oh (controller)!");
},
}).pipeThrough(new TextEncoderStream());
}
// Long stream with Lorem Ipsum text.
function longStream() {
return new ReadableStream({
start(controller) {
for (let i = 0; i < 4; i++) {
setTimeout(() => {
controller.enqueue(LOREM);
if (i == 3) {
controller.close();
}
}, i * 100);
}
},
}).pipeThrough(new TextEncoderStream());
}
// Long stream with Lorem Ipsum text.
function longAsyncStream(cancelResolve?: (value: unknown) => void) {
let currentTimeout: number | undefined = undefined;
return new ReadableStream({
async start(controller) {
for (let i = 0; i < 100; i++) {
await new Promise((r) => currentTimeout = setTimeout(r, 1));
currentTimeout = undefined;
controller.enqueue(LOREM);
}
controller.close();
},
cancel(reason) {
if (cancelResolve != undefined) {
cancelResolve(reason);
}
if (currentTimeout !== undefined) {
clearTimeout(currentTimeout);
}
},
}).pipeThrough(new TextEncoderStream());
}
// Empty stream, closes either immediately or on a call to pull.
function emptyStream(onPull: boolean) {
return new ReadableStream({
start(controller) {
if (!onPull) {
controller.close();
}
},
pull(controller) {
if (onPull) {
controller.close();
}
},
}).pipeThrough(new TextEncoderStream());
}
function largePacketStream(packetSize: number, count: number) {
return new ReadableStream({
pull(controller) {
if (count-- > 0) {
const buffer = new Uint8Array(packetSize);
for (let i = 0; i < 256; i++) {
buffer[i * (packetSize / 256)] = i;
}
controller.enqueue(buffer);
} else {
controller.close();
}
},
});
}
// Include an empty chunk
function emptyChunkStream() {
return new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1]));
controller.enqueue(new Uint8Array([]));
controller.enqueue(new Uint8Array([2]));
controller.close();
},
});
}
perf(ext/streams): optimize streams (#20649) This PR introduces several optimizations to streams ### Highlights: - `ReadableStream` constructor: +20% iter/s. - `WritableStream` constructor: +50% iter/s. - `TransformStream` constructor: +30% iter/s. - `ReadableStream` iterator (both 2 and 20 chunks): +42% and +25% iter/s. - `ReadableByteStream` iterator (both 2 and 20 chunks): +39% and +20% iter/s. ### Benchmarks **main** ``` cpu: 13th Gen Intel(R) Core(TM) i9-13900H runtime: deno 1.37.0 (x86_64-unknown-linux-gnu) benchmark time (avg) iter/s (min … max) p75 p99 p995 ----------------------------------------------------------------------------------------------- ----------------------------- ReadableStream constructor 294.52 ns/iter 3,395,392.9 (277.92 ns … 618.26 ns) 292.66 ns 353.87 ns 618.26 ns WritableStream constructor 235.51 ns/iter 4,246,065.3 (213.04 ns … 306.35 ns) 236.77 ns 279.08 ns 281.32 ns TransformStream constructor 672.52 ns/iter 1,486,938.7 (652.15 ns … 880.74 ns) 670.11 ns 880.74 ns 880.74 ns ReadableStream - iterator (2 chunks) 10.44 µs/iter 95,757.9 (8.97 µs … 830.91 µs) 10.22 µs 14.74 µs 18.93 µs ReadableStream - iterator (20 chunks) 21.93 µs/iter 45,593.4 (18.8 µs … 864.97 µs) 20.57 µs 57.15 µs 137.16 µs ReadableStream - reader (2 chunks) 7.09 µs/iter 140,987.2 (7.03 µs … 7.18 µs) 7.13 µs 7.18 µs 7.18 µs ReadableStream - reader (20 chunks) 18.41 µs/iter 54,324.2 (15.7 µs … 252.7 µs) 17.14 µs 68.88 µs 94.08 µs ReadableByteStream - iterator (2 chunks) 11.06 µs/iter 90,375.1 (9.75 µs … 404.69 µs) 10.88 µs 16.6 µs 29.69 µs ReadableByteStream - iterator (20 chunks) 26.71 µs/iter 37,435.0 (22.98 µs … 508.34 µs) 25.25 µs 85.28 µs 155.65 µs ReadableByteStream - reader (2 chunks) 7.99 µs/iter 125,131.1 (7.92 µs … 8.13 µs) 8.01 µs 8.13 µs 8.13 µs ReadableByteStream - reader (20 chunks) 23.46 µs/iter 42,618.5 (20.28 µs … 414.66 µs) 21.94 µs 90.52 µs 147.38 µs ``` **this PR** ``` cpu: 13th Gen Intel(R) Core(TM) i9-13900H runtime: deno 1.37.0 (x86_64-unknown-linux-gnu) benchmark time (avg) iter/s (min … max) p75 p99 p995 ----------------------------------------------------------------------------------------------- ----------------------------- ReadableStream constructor 235.48 ns/iter 4,246,584.3 (223.12 ns … 504.65 ns) 234.3 ns 290.84 ns 311.12 ns WritableStream constructor 156.31 ns/iter 6,397,537.3 (148.54 ns … 211.13 ns) 157.49 ns 199.82 ns 208.23 ns TransformStream constructor 471.29 ns/iter 2,121,815.3 (452.53 ns … 791.41 ns) 468.62 ns 540.36 ns 791.41 ns ReadableStream - iterator (2 chunks) 7.32 µs/iter 136,705.4 (6.35 µs … 639.97 µs) 7.1 µs 12.12 µs 20.98 µs ReadableStream - iterator (20 chunks) 17.48 µs/iter 57,195.1 (14.48 µs … 289.06 µs) 16.06 µs 76.98 µs 114.61 µs ReadableStream - reader (2 chunks) 6.86 µs/iter 145,847.9 (6.8 µs … 6.97 µs) 6.88 µs 6.97 µs 6.97 µs ReadableStream - reader (20 chunks) 16.88 µs/iter 59,227.7 (14.04 µs … 311.29 µs) 15.39 µs 74.95 µs 97.45 µs ReadableByteStream - iterator (2 chunks) 7.94 µs/iter 125,881.2 (6.86 µs … 811.16 µs) 7.69 µs 11.43 µs 16.6 µs ReadableByteStream - iterator (20 chunks) 22.23 µs/iter 44,978.2 (18.98 µs … 590.11 µs) 20.73 µs 45.13 µs 159.8 µs ReadableByteStream - reader (2 chunks) 7.4 µs/iter 135,206.9 (7.36 µs … 7.42 µs) 7.4 µs 7.42 µs 7.42 µs ReadableByteStream - reader (20 chunks) 21.03 µs/iter 47,555.6 (17.75 µs … 357.66 µs) 19.52 µs 98.69 µs 146.5 µs ``` --------- Co-authored-by: Luca Casonato <hello@lcas.dev>
2023-10-13 08:30:09 -04:00
// Try to blow up any recursive reads.
function veryLongTinyPacketStream(length: number) {
return new ReadableStream({
start(controller) {
for (let i = 0; i < length; i++) {
controller.enqueue(new Uint8Array([1]));
}
controller.close();
},
});
}
// Creates a stream with the given number of packets, a configurable delay between packets, and a final
// action (either "Throw" or "Close").
function makeStreamWithCount(
count: number,
delay: number,
action: "Throw" | "Close",
): ReadableStream {
function doAction(controller: ReadableStreamDefaultController, i: number) {
if (i == count) {
if (action == "Throw") {
controller.error(new Error("Expected error!"));
} else {
controller.close();
}
} else {
controller.enqueue(String.fromCharCode("a".charCodeAt(0) + i));
if (delay == 0) {
doAction(controller, i + 1);
} else {
setTimeout(() => doAction(controller, i + 1), delay);
}
}
}
return new ReadableStream({
start(controller) {
if (delay == 0) {
doAction(controller, 0);
} else {
setTimeout(() => doAction(controller, 0), delay);
}
},
}).pipeThrough(new TextEncoderStream());
}
// Normal stream operation
Deno.test(async function readableStream() {
const rid = resourceForReadableStream(helloWorldStream());
const buffer = new Uint8Array(1024);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 12);
core.ops.op_close(rid);
});
// Close the stream after reading everything
Deno.test(async function readableStreamClose() {
const cancel = Promise.withResolvers();
const rid = resourceForReadableStream(
helloWorldStream(false, cancel.resolve),
);
const buffer = new Uint8Array(1024);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 12);
core.ops.op_close(rid);
assertEquals(await cancel.promise, "resource closed");
});
// Close the stream without reading everything
Deno.test(async function readableStreamClosePartialRead() {
const cancel = Promise.withResolvers();
const rid = resourceForReadableStream(
helloWorldStream(false, cancel.resolve),
);
const buffer = new Uint8Array(5);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 5);
core.ops.op_close(rid);
assertEquals(await cancel.promise, "resource closed");
});
// Close the stream without reading anything
Deno.test(async function readableStreamCloseWithoutRead() {
const cancel = Promise.withResolvers();
const rid = resourceForReadableStream(
helloWorldStream(false, cancel.resolve),
);
core.ops.op_close(rid);
assertEquals(await cancel.promise, "resource closed");
});
// Close the stream without reading anything
Deno.test(async function readableStreamCloseWithoutRead2() {
const cancel = Promise.withResolvers();
const rid = resourceForReadableStream(longAsyncStream(cancel.resolve));
core.ops.op_close(rid);
assertEquals(await cancel.promise, "resource closed");
});
Deno.test(async function readableStreamPartial() {
const rid = resourceForReadableStream(helloWorldStream());
const buffer = new Uint8Array(5);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 5);
const buffer2 = new Uint8Array(1024);
const nread2 = await core.ops.op_read(rid, buffer2);
assertEquals(nread2, 7);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamLongReadAll() {
const rid = resourceForReadableStream(longStream());
const buffer = await core.ops.op_read_all(rid);
assertEquals(buffer.length, LOREM.length * 4);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamLongAsyncReadAll() {
const rid = resourceForReadableStream(longAsyncStream());
const buffer = await core.ops.op_read_all(rid);
assertEquals(buffer.length, LOREM.length * 100);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamVeryLongReadAll() {
perf(ext/streams): optimize streams (#20649) This PR introduces several optimizations to streams ### Highlights: - `ReadableStream` constructor: +20% iter/s. - `WritableStream` constructor: +50% iter/s. - `TransformStream` constructor: +30% iter/s. - `ReadableStream` iterator (both 2 and 20 chunks): +42% and +25% iter/s. - `ReadableByteStream` iterator (both 2 and 20 chunks): +39% and +20% iter/s. ### Benchmarks **main** ``` cpu: 13th Gen Intel(R) Core(TM) i9-13900H runtime: deno 1.37.0 (x86_64-unknown-linux-gnu) benchmark time (avg) iter/s (min … max) p75 p99 p995 ----------------------------------------------------------------------------------------------- ----------------------------- ReadableStream constructor 294.52 ns/iter 3,395,392.9 (277.92 ns … 618.26 ns) 292.66 ns 353.87 ns 618.26 ns WritableStream constructor 235.51 ns/iter 4,246,065.3 (213.04 ns … 306.35 ns) 236.77 ns 279.08 ns 281.32 ns TransformStream constructor 672.52 ns/iter 1,486,938.7 (652.15 ns … 880.74 ns) 670.11 ns 880.74 ns 880.74 ns ReadableStream - iterator (2 chunks) 10.44 µs/iter 95,757.9 (8.97 µs … 830.91 µs) 10.22 µs 14.74 µs 18.93 µs ReadableStream - iterator (20 chunks) 21.93 µs/iter 45,593.4 (18.8 µs … 864.97 µs) 20.57 µs 57.15 µs 137.16 µs ReadableStream - reader (2 chunks) 7.09 µs/iter 140,987.2 (7.03 µs … 7.18 µs) 7.13 µs 7.18 µs 7.18 µs ReadableStream - reader (20 chunks) 18.41 µs/iter 54,324.2 (15.7 µs … 252.7 µs) 17.14 µs 68.88 µs 94.08 µs ReadableByteStream - iterator (2 chunks) 11.06 µs/iter 90,375.1 (9.75 µs … 404.69 µs) 10.88 µs 16.6 µs 29.69 µs ReadableByteStream - iterator (20 chunks) 26.71 µs/iter 37,435.0 (22.98 µs … 508.34 µs) 25.25 µs 85.28 µs 155.65 µs ReadableByteStream - reader (2 chunks) 7.99 µs/iter 125,131.1 (7.92 µs … 8.13 µs) 8.01 µs 8.13 µs 8.13 µs ReadableByteStream - reader (20 chunks) 23.46 µs/iter 42,618.5 (20.28 µs … 414.66 µs) 21.94 µs 90.52 µs 147.38 µs ``` **this PR** ``` cpu: 13th Gen Intel(R) Core(TM) i9-13900H runtime: deno 1.37.0 (x86_64-unknown-linux-gnu) benchmark time (avg) iter/s (min … max) p75 p99 p995 ----------------------------------------------------------------------------------------------- ----------------------------- ReadableStream constructor 235.48 ns/iter 4,246,584.3 (223.12 ns … 504.65 ns) 234.3 ns 290.84 ns 311.12 ns WritableStream constructor 156.31 ns/iter 6,397,537.3 (148.54 ns … 211.13 ns) 157.49 ns 199.82 ns 208.23 ns TransformStream constructor 471.29 ns/iter 2,121,815.3 (452.53 ns … 791.41 ns) 468.62 ns 540.36 ns 791.41 ns ReadableStream - iterator (2 chunks) 7.32 µs/iter 136,705.4 (6.35 µs … 639.97 µs) 7.1 µs 12.12 µs 20.98 µs ReadableStream - iterator (20 chunks) 17.48 µs/iter 57,195.1 (14.48 µs … 289.06 µs) 16.06 µs 76.98 µs 114.61 µs ReadableStream - reader (2 chunks) 6.86 µs/iter 145,847.9 (6.8 µs … 6.97 µs) 6.88 µs 6.97 µs 6.97 µs ReadableStream - reader (20 chunks) 16.88 µs/iter 59,227.7 (14.04 µs … 311.29 µs) 15.39 µs 74.95 µs 97.45 µs ReadableByteStream - iterator (2 chunks) 7.94 µs/iter 125,881.2 (6.86 µs … 811.16 µs) 7.69 µs 11.43 µs 16.6 µs ReadableByteStream - iterator (20 chunks) 22.23 µs/iter 44,978.2 (18.98 µs … 590.11 µs) 20.73 µs 45.13 µs 159.8 µs ReadableByteStream - reader (2 chunks) 7.4 µs/iter 135,206.9 (7.36 µs … 7.42 µs) 7.4 µs 7.42 µs 7.42 µs ReadableByteStream - reader (20 chunks) 21.03 µs/iter 47,555.6 (17.75 µs … 357.66 µs) 19.52 µs 98.69 µs 146.5 µs ``` --------- Co-authored-by: Luca Casonato <hello@lcas.dev>
2023-10-13 08:30:09 -04:00
const rid = resourceForReadableStream(veryLongTinyPacketStream(1_000_000));
const buffer = await core.ops.op_read_all(rid);
perf(ext/streams): optimize streams (#20649) This PR introduces several optimizations to streams ### Highlights: - `ReadableStream` constructor: +20% iter/s. - `WritableStream` constructor: +50% iter/s. - `TransformStream` constructor: +30% iter/s. - `ReadableStream` iterator (both 2 and 20 chunks): +42% and +25% iter/s. - `ReadableByteStream` iterator (both 2 and 20 chunks): +39% and +20% iter/s. ### Benchmarks **main** ``` cpu: 13th Gen Intel(R) Core(TM) i9-13900H runtime: deno 1.37.0 (x86_64-unknown-linux-gnu) benchmark time (avg) iter/s (min … max) p75 p99 p995 ----------------------------------------------------------------------------------------------- ----------------------------- ReadableStream constructor 294.52 ns/iter 3,395,392.9 (277.92 ns … 618.26 ns) 292.66 ns 353.87 ns 618.26 ns WritableStream constructor 235.51 ns/iter 4,246,065.3 (213.04 ns … 306.35 ns) 236.77 ns 279.08 ns 281.32 ns TransformStream constructor 672.52 ns/iter 1,486,938.7 (652.15 ns … 880.74 ns) 670.11 ns 880.74 ns 880.74 ns ReadableStream - iterator (2 chunks) 10.44 µs/iter 95,757.9 (8.97 µs … 830.91 µs) 10.22 µs 14.74 µs 18.93 µs ReadableStream - iterator (20 chunks) 21.93 µs/iter 45,593.4 (18.8 µs … 864.97 µs) 20.57 µs 57.15 µs 137.16 µs ReadableStream - reader (2 chunks) 7.09 µs/iter 140,987.2 (7.03 µs … 7.18 µs) 7.13 µs 7.18 µs 7.18 µs ReadableStream - reader (20 chunks) 18.41 µs/iter 54,324.2 (15.7 µs … 252.7 µs) 17.14 µs 68.88 µs 94.08 µs ReadableByteStream - iterator (2 chunks) 11.06 µs/iter 90,375.1 (9.75 µs … 404.69 µs) 10.88 µs 16.6 µs 29.69 µs ReadableByteStream - iterator (20 chunks) 26.71 µs/iter 37,435.0 (22.98 µs … 508.34 µs) 25.25 µs 85.28 µs 155.65 µs ReadableByteStream - reader (2 chunks) 7.99 µs/iter 125,131.1 (7.92 µs … 8.13 µs) 8.01 µs 8.13 µs 8.13 µs ReadableByteStream - reader (20 chunks) 23.46 µs/iter 42,618.5 (20.28 µs … 414.66 µs) 21.94 µs 90.52 µs 147.38 µs ``` **this PR** ``` cpu: 13th Gen Intel(R) Core(TM) i9-13900H runtime: deno 1.37.0 (x86_64-unknown-linux-gnu) benchmark time (avg) iter/s (min … max) p75 p99 p995 ----------------------------------------------------------------------------------------------- ----------------------------- ReadableStream constructor 235.48 ns/iter 4,246,584.3 (223.12 ns … 504.65 ns) 234.3 ns 290.84 ns 311.12 ns WritableStream constructor 156.31 ns/iter 6,397,537.3 (148.54 ns … 211.13 ns) 157.49 ns 199.82 ns 208.23 ns TransformStream constructor 471.29 ns/iter 2,121,815.3 (452.53 ns … 791.41 ns) 468.62 ns 540.36 ns 791.41 ns ReadableStream - iterator (2 chunks) 7.32 µs/iter 136,705.4 (6.35 µs … 639.97 µs) 7.1 µs 12.12 µs 20.98 µs ReadableStream - iterator (20 chunks) 17.48 µs/iter 57,195.1 (14.48 µs … 289.06 µs) 16.06 µs 76.98 µs 114.61 µs ReadableStream - reader (2 chunks) 6.86 µs/iter 145,847.9 (6.8 µs … 6.97 µs) 6.88 µs 6.97 µs 6.97 µs ReadableStream - reader (20 chunks) 16.88 µs/iter 59,227.7 (14.04 µs … 311.29 µs) 15.39 µs 74.95 µs 97.45 µs ReadableByteStream - iterator (2 chunks) 7.94 µs/iter 125,881.2 (6.86 µs … 811.16 µs) 7.69 µs 11.43 µs 16.6 µs ReadableByteStream - iterator (20 chunks) 22.23 µs/iter 44,978.2 (18.98 µs … 590.11 µs) 20.73 µs 45.13 µs 159.8 µs ReadableByteStream - reader (2 chunks) 7.4 µs/iter 135,206.9 (7.36 µs … 7.42 µs) 7.4 µs 7.42 µs 7.42 µs ReadableByteStream - reader (20 chunks) 21.03 µs/iter 47,555.6 (17.75 µs … 357.66 µs) 19.52 µs 98.69 µs 146.5 µs ``` --------- Co-authored-by: Luca Casonato <hello@lcas.dev>
2023-10-13 08:30:09 -04:00
assertEquals(buffer.length, 1_000_000);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamLongByPiece() {
const rid = resourceForReadableStream(longStream());
let total = 0;
for (let i = 0; i < 100; i++) {
const length = await core.ops.op_read(rid, new Uint8Array(16));
total += length;
if (length == 0) {
break;
}
}
assertEquals(total, LOREM.length * 4);
core.ops.op_close(rid);
});
for (
const type of [
"string",
"TypeError",
"controller",
] as ("string" | "TypeError" | "controller")[]
) {
Deno.test(`readableStreamError_${type}`, async function () {
const rid = resourceForReadableStream(errorStream(type));
let nread;
try {
nread = await core.ops.op_read(rid, new Uint8Array(16));
} catch (_) {
fail("Should not have thrown");
}
assertEquals(12, nread);
try {
await core.ops.op_read(rid, new Uint8Array(1));
fail();
} catch (e) {
assertEquals(e.message, `Uh oh (${type})!`);
}
core.ops.op_close(rid);
});
}
Deno.test(async function readableStreamEmptyOnStart() {
const rid = resourceForReadableStream(emptyStream(true));
const buffer = new Uint8Array(1024);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 0);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamEmptyOnPull() {
const rid = resourceForReadableStream(emptyStream(false));
const buffer = new Uint8Array(1024);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 0);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamEmptyReadAll() {
const rid = resourceForReadableStream(emptyStream(false));
const buffer = await core.ops.op_read_all(rid);
assertEquals(buffer.length, 0);
core.ops.op_close(rid);
});
Deno.test(async function readableStreamWithEmptyChunk() {
const rid = resourceForReadableStream(emptyChunkStream());
const buffer = await core.ops.op_read_all(rid);
assertEquals(buffer, new Uint8Array([1, 2]));
core.ops.op_close(rid);
});
Deno.test(async function readableStreamWithEmptyChunkOneByOne() {
const rid = resourceForReadableStream(emptyChunkStream());
assertEquals(1, await core.ops.op_read(rid, new Uint8Array(1)));
assertEquals(1, await core.ops.op_read(rid, new Uint8Array(1)));
assertEquals(0, await core.ops.op_read(rid, new Uint8Array(1)));
core.ops.op_close(rid);
});
// Ensure that we correctly transmit all the sub-chunks of the larger chunks.
Deno.test(async function readableStreamReadSmallerChunks() {
const packetSize = 16 * 1024;
const rid = resourceForReadableStream(largePacketStream(packetSize, 1));
const buffer = new Uint8Array(packetSize);
for (let i = 0; i < packetSize / 1024; i++) {
await core.ops.op_read(rid, buffer.subarray(i * 1024, i * 1024 + 1024));
}
for (let i = 0; i < 256; i++) {
assertEquals(
i,
buffer[i * (packetSize / 256)],
`at index ${i * (packetSize / 256)}`,
);
}
core.ops.op_close(rid);
});
Deno.test(async function readableStreamLargePackets() {
const packetSize = 128 * 1024;
const rid = resourceForReadableStream(largePacketStream(packetSize, 1024));
for (let i = 0; i < 1024; i++) {
const buffer = new Uint8Array(packetSize);
assertEquals(packetSize, await core.ops.op_read(rid, buffer));
for (let i = 0; i < 256; i++) {
assertEquals(
i,
buffer[i * (packetSize / 256)],
`at index ${i * (packetSize / 256)}`,
);
}
}
assertEquals(0, await core.ops.op_read(rid, new Uint8Array(1)));
core.ops.op_close(rid);
});
Deno.test(async function readableStreamVeryLargePackets() {
// 1024 packets of 1MB
const rid = resourceForReadableStream(largePacketStream(1024 * 1024, 1024));
let total = 0;
// Read 96kB up to 12,288 times (96kB is not an even multiple of the 1MB packet size to test this)
const readCounts: Record<number, number> = {};
for (let i = 0; i < 12 * 1024; i++) {
const nread = await core.ops.op_read(rid, new Uint8Array(96 * 1024));
total += nread;
readCounts[nread] = (readCounts[nread] || 0) + 1;
if (nread == 0) {
break;
}
}
assertEquals({ 0: 1, 65536: 1024, 98304: 10 * 1024 }, readCounts);
assertEquals(total, 1024 * 1024 * 1024);
core.ops.op_close(rid);
});
for (const count of [0, 1, 2, 3]) {
for (const delay of [0, 1, 10]) {
// Creating a stream that errors in start will throw
if (delay > 0) {
createStreamTest(count, delay, "Throw");
}
createStreamTest(count, delay, "Close");
}
}
function createStreamTest(
count: number,
delay: number,
action: "Throw" | "Close",
) {
Deno.test(`streamCount${count}Delay${delay}${action}`, async () => {
let rid;
try {
rid = resourceForReadableStream(
makeStreamWithCount(count, delay, action),
);
for (let i = 0; i < count; i++) {
const buffer = new Uint8Array(1);
await core.ops.op_read(rid, buffer);
}
if (action == "Throw") {
try {
const buffer = new Uint8Array(1);
assertEquals(1, await core.ops.op_read(rid, buffer));
fail();
} catch (e) {
// We expect this to be thrown
assertEquals(e.message, "Expected error!");
}
} else {
const buffer = new Uint8Array(1);
assertEquals(0, await core.ops.op_read(rid, buffer));
}
} finally {
core.ops.op_close(rid);
}
});
}
// 1024 is the size of the internal packet buffer -- we want to make sure we fill the internal pipe fully.
for (const packetCount of [1, 1024]) {
Deno.test(`readableStreamWithAggressiveResourceClose_${packetCount}`, async function () {
let first = true;
const { promise, resolve } = Promise.withResolvers();
const rid = resourceForReadableStream(
new ReadableStream({
pull(controller) {
if (first) {
// We queue this up and then immediately close the resource (not the reader)
for (let i = 0; i < packetCount; i++) {
controller.enqueue(new Uint8Array(1));
}
core.close(rid);
// This doesn't throw, even though the resource is closed
controller.enqueue(new Uint8Array(1));
first = false;
}
},
cancel(reason) {
resolve(reason);
},
}),
);
try {
for (let i = 0; i < packetCount; i++) {
await core.ops.op_read(rid, new Uint8Array(1));
}
fail();
} catch (e) {
assertEquals(e.message, "operation canceled");
}
assertEquals(await promise, "resource closed");
});
}