diff --git a/std/io/streams.ts b/std/io/streams.ts new file mode 100644 index 0000000000..3969746ef5 --- /dev/null +++ b/std/io/streams.ts @@ -0,0 +1,34 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +export function fromStreamWriter( + streamWriter: WritableStreamDefaultWriter +): Deno.Writer { + return { + async write(p: Uint8Array): Promise { + await streamWriter.ready; + await streamWriter.write(p); + return p.length; + }, + }; +} + +export function fromStreamReader( + streamReader: ReadableStreamDefaultReader +): Deno.Reader { + const buffer = new Deno.Buffer(); + + return { + async read(p: Uint8Array): Promise { + if (buffer.empty()) { + const res = await streamReader.read(); + if (res.done) { + return null; // EOF + } + + await Deno.writeAll(buffer, res.value); + } + + return buffer.read(p); + }, + }; +} diff --git a/std/io/streams_test.ts b/std/io/streams_test.ts new file mode 100644 index 0000000000..00d056e2fc --- /dev/null +++ b/std/io/streams_test.ts @@ -0,0 +1,134 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +import { assertEquals, assert } from "../testing/asserts.ts"; +import { fromStreamWriter, fromStreamReader } from "./streams.ts"; + +function repeat(c: string, bytes: number): Uint8Array { + assertEquals(c.length, 1); + const ui8 = new Uint8Array(bytes); + ui8.fill(c.charCodeAt(0)); + return ui8; +} + +Deno.test("toWriterCheck", async function (): Promise { + const written: string[] = []; + const chunks: string[] = ["hello", "deno", "land"]; + const writableStream = new WritableStream({ + write(chunk): void { + const decoder = new TextDecoder(); + written.push(decoder.decode(chunk)); + }, + }); + + const encoder = new TextEncoder(); + const writer = fromStreamWriter(writableStream.getWriter()); + + for (const chunk of chunks) { + const n = await writer.write(encoder.encode(chunk)); + // stream writers always write all the bytes + assertEquals(n, chunk.length); + } + + assertEquals(written, chunks); +}); + +Deno.test("toReaderCheck", async function (): Promise { + const chunks: string[] = ["hello", "deno", "land"]; + const expected = chunks.slice(); + const readChunks: Uint8Array[] = []; + const readableStream = new ReadableStream({ + pull(controller): void { + const encoder = new TextEncoder(); + const chunk = chunks.shift(); + if (!chunk) return controller.close(); + controller.enqueue(encoder.encode(chunk)); + }, + }); + + const decoder = new TextDecoder(); + const reader = fromStreamReader(readableStream.getReader()); + + let i = 0; + + while (true) { + const b = new Uint8Array(1024); + const n = await reader.read(b); + + if (n === null) break; + + readChunks.push(b.subarray(0, n)); + assert(i < expected.length); + + i++; + } + + assertEquals( + expected, + readChunks.map((chunk) => decoder.decode(chunk)) + ); +}); + +Deno.test("toReaderBigChunksCheck", async function (): Promise { + const bufSize = 1024; + const chunkSize = 3 * bufSize; + const writer = new Deno.Buffer(); + + // A readable stream can enqueue chunks bigger than Copy bufSize + // Reader returned by toReader should enqueue exceeding bytes + const chunks: string[] = [ + "a".repeat(chunkSize), + "b".repeat(chunkSize), + "c".repeat(chunkSize), + ]; + const expected = chunks.slice(); + const readableStream = new ReadableStream({ + pull(controller): void { + const encoder = new TextEncoder(); + const chunk = chunks.shift(); + if (!chunk) return controller.close(); + + controller.enqueue(encoder.encode(chunk)); + }, + }); + + const reader = fromStreamReader(readableStream.getReader()); + const n = await Deno.copy(reader, writer, { bufSize }); + + const expectedWritten = chunkSize * expected.length; + assertEquals(n, chunkSize * expected.length); + assertEquals(writer.length, expectedWritten); +}); + +Deno.test("toReaderBigIrregularChunksCheck", async function (): Promise { + const bufSize = 1024; + const chunkSize = 3 * bufSize; + const writer = new Deno.Buffer(); + + // A readable stream can enqueue chunks bigger than Copy bufSize + // Reader returned by toReader should enqueue exceeding bytes + const chunks: Uint8Array[] = [ + repeat("a", chunkSize), + repeat("b", chunkSize + 253), + repeat("c", chunkSize + 8), + ]; + const expected = new Uint8Array( + chunks + .slice() + .map((chunk) => [...chunk]) + .flat() + ); + const readableStream = new ReadableStream({ + pull(controller): void { + const chunk = chunks.shift(); + if (!chunk) return controller.close(); + + controller.enqueue(chunk); + }, + }); + + const reader = fromStreamReader(readableStream.getReader()); + + const n = await Deno.copy(reader, writer, { bufSize }); + assertEquals(n, expected.length); + assertEquals(expected, writer.bytes()); +});