From 7e32269f3f230c5b714bbf70aa59d74f9a867373 Mon Sep 17 00:00:00 2001 From: Kitson Kelly Date: Mon, 4 May 2020 05:10:52 +1000 Subject: [PATCH] Add TransformStream and TransformStreamController (#5042) --- cli/js/globals.ts | 2 + cli/js/lib.deno.shared_globals.d.ts | 36 ++ cli/js/tests/streams_transform_test.ts | 562 ++++++++++++++++++ cli/js/tests/unit_tests.ts | 1 + cli/js/web/streams/internals.ts | 364 +++++++++++- cli/js/web/streams/symbols.ts | 7 + cli/js/web/streams/transform_stream.ts | 118 ++++ .../transform_stream_default_controller.ts | 75 +++ 8 files changed, 1156 insertions(+), 9 deletions(-) create mode 100644 cli/js/tests/streams_transform_test.ts create mode 100644 cli/js/web/streams/transform_stream.ts create mode 100644 cli/js/web/streams/transform_stream_default_controller.ts diff --git a/cli/js/globals.ts b/cli/js/globals.ts index caf069ffd0..89f6075a3b 100644 --- a/cli/js/globals.ts +++ b/cli/js/globals.ts @@ -23,6 +23,7 @@ import * as workers from "./web/workers.ts"; import * as performanceUtil from "./web/performance.ts"; import * as request from "./web/request.ts"; import * as readableStream from "./web/streams/readable_stream.ts"; +import * as transformStream from "./web/streams/transform_stream.ts"; import * as queuingStrategy from "./web/streams/queuing_strategy.ts"; import * as writableStream from "./web/streams/writable_stream.ts"; @@ -234,6 +235,7 @@ export const windowOrWorkerGlobalScopeProperties = { TextEncoder: nonEnumerable(textEncoding.TextEncoder), TextDecoder: nonEnumerable(textEncoding.TextDecoder), ReadableStream: nonEnumerable(readableStream.ReadableStreamImpl), + TransformStream: nonEnumerable(transformStream.TransformStreamImpl), Request: nonEnumerable(request.Request), Response: nonEnumerable(fetchTypes.Response), performance: writable(new performanceUtil.Performance()), diff --git a/cli/js/lib.deno.shared_globals.d.ts b/cli/js/lib.deno.shared_globals.d.ts index 8f73f0585c..d962304473 100644 --- a/cli/js/lib.deno.shared_globals.d.ts +++ b/cli/js/lib.deno.shared_globals.d.ts @@ -423,6 +423,42 @@ interface WritableStreamDefaultWriter { write(chunk: W): Promise; } +declare class TransformStream { + constructor( + transformer?: Transformer, + writableStrategy?: QueuingStrategy, + readableStrategy?: QueuingStrategy + ); + readonly readable: ReadableStream; + readonly writable: WritableStream; +} + +interface TransformStreamDefaultController { + readonly desiredSize: number | null; + enqueue(chunk: O): void; + error(reason?: any): void; + terminate(): void; +} + +interface Transformer { + flush?: TransformStreamDefaultControllerCallback; + readableType?: undefined; + start?: TransformStreamDefaultControllerCallback; + transform?: TransformStreamDefaultControllerTransformCallback; + writableType?: undefined; +} + +interface TransformStreamDefaultControllerCallback { + (controller: TransformStreamDefaultController): void | PromiseLike; +} + +interface TransformStreamDefaultControllerTransformCallback { + ( + chunk: I, + controller: TransformStreamDefaultController + ): void | PromiseLike; +} + interface DOMStringList { /** Returns the number of strings in strings. */ readonly length: number; diff --git a/cli/js/tests/streams_transform_test.ts b/cli/js/tests/streams_transform_test.ts new file mode 100644 index 0000000000..f3ec148ae5 --- /dev/null +++ b/cli/js/tests/streams_transform_test.ts @@ -0,0 +1,562 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +import { + unitTest, + assert, + assertEquals, + assertNotEquals, + assertThrows, +} from "./test_util.ts"; + +function delay(seconds: number): Promise { + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, seconds); + }); +} + +function readableStreamToArray( + readable: { getReader(): ReadableStreamDefaultReader }, + reader?: ReadableStreamDefaultReader +): Promise { + if (reader === undefined) { + reader = readable.getReader(); + } + + const chunks: R[] = []; + + return pump(); + + function pump(): Promise { + return reader!.read().then((result) => { + if (result.done) { + return chunks; + } + + chunks.push(result.value); + return pump(); + }); + } +} + +unitTest(function transformStreamConstructedWithTransformFunction() { + new TransformStream({ transform(): void {} }); +}); + +unitTest(function transformStreamConstructedNoTransform() { + new TransformStream(); + new TransformStream({}); +}); + +unitTest(function transformStreamIntstancesHaveProperProperties() { + const ts = new TransformStream({ transform(): void {} }); + const proto = Object.getPrototypeOf(ts); + + const writableStream = Object.getOwnPropertyDescriptor(proto, "writable"); + assert(writableStream !== undefined, "it has a writable property"); + assert(!writableStream.enumerable, "writable should be non-enumerable"); + assertEquals( + typeof writableStream.get, + "function", + "writable should have a getter" + ); + assertEquals( + writableStream.set, + undefined, + "writable should not have a setter" + ); + assert(writableStream.configurable, "writable should be configurable"); + assert( + ts.writable instanceof WritableStream, + "writable is an instance of WritableStream" + ); + assert( + WritableStream.prototype.getWriter.call(ts.writable), + "writable should pass WritableStream brand check" + ); + + const readableStream = Object.getOwnPropertyDescriptor(proto, "readable"); + assert(readableStream !== undefined, "it has a readable property"); + assert(!readableStream.enumerable, "readable should be non-enumerable"); + assertEquals( + typeof readableStream.get, + "function", + "readable should have a getter" + ); + assertEquals( + readableStream.set, + undefined, + "readable should not have a setter" + ); + assert(readableStream.configurable, "readable should be configurable"); + assert( + ts.readable instanceof ReadableStream, + "readable is an instance of ReadableStream" + ); + assertNotEquals( + ReadableStream.prototype.getReader.call(ts.readable), + undefined, + "readable should pass ReadableStream brand check" + ); +}); + +unitTest(function transformStreamWritableStartsAsWritable() { + const ts = new TransformStream({ transform(): void {} }); + + const writer = ts.writable.getWriter(); + assertEquals(writer.desiredSize, 1, "writer.desiredSize should be 1"); +}); + +unitTest(async function transformStreamReadableCanReadOutOfWritable() { + const ts = new TransformStream(); + + const writer = ts.writable.getWriter(); + writer.write("a"); + assertEquals( + writer.desiredSize, + 0, + "writer.desiredSize should be 0 after write()" + ); + + const result = await ts.readable.getReader().read(); + assertEquals( + result.value, + "a", + "result from reading the readable is the same as was written to writable" + ); + assert(!result.done, "stream should not be done"); + + await delay(0); + assert(writer.desiredSize === 1, "desiredSize should be 1 again"); +}); + +unitTest(async function transformStreamCanReadWhatIsWritten() { + let c: TransformStreamDefaultController; + const ts = new TransformStream({ + start(controller: TransformStreamDefaultController): void { + c = controller; + }, + transform(chunk: string): void { + c.enqueue(chunk.toUpperCase()); + }, + }); + + const writer = ts.writable.getWriter(); + writer.write("a"); + + const result = await ts.readable.getReader().read(); + assertEquals( + result.value, + "A", + "result from reading the readable is the transformation of what was written to writable" + ); + assert(!result.done, "stream should not be done"); +}); + +unitTest(async function transformStreamCanReadBothChunks() { + let c: TransformStreamDefaultController; + const ts = new TransformStream({ + start(controller: TransformStreamDefaultController): void { + c = controller; + }, + transform(chunk: string): void { + c.enqueue(chunk.toUpperCase()); + c.enqueue(chunk.toUpperCase()); + }, + }); + + const writer = ts.writable.getWriter(); + writer.write("a"); + + const reader = ts.readable.getReader(); + + const result1 = await reader.read(); + assertEquals( + result1.value, + "A", + "the first chunk read is the transformation of the single chunk written" + ); + assert(!result1.done, "stream should not be done"); + + const result2 = await reader.read(); + assertEquals( + result2.value, + "A", + "the second chunk read is also the transformation of the single chunk written" + ); + assert(!result2.done, "stream should not be done"); +}); + +unitTest(async function transformStreamCanReadWhatIsWritten() { + let c: TransformStreamDefaultController; + const ts = new TransformStream({ + start(controller: TransformStreamDefaultController): void { + c = controller; + }, + transform(chunk: string): Promise { + return delay(0).then(() => c.enqueue(chunk.toUpperCase())); + }, + }); + + const writer = ts.writable.getWriter(); + writer.write("a"); + + const result = await ts.readable.getReader().read(); + assertEquals( + result.value, + "A", + "result from reading the readable is the transformation of what was written to writable" + ); + assert(!result.done, "stream should not be done"); +}); + +unitTest(async function transformStreamAsyncReadMultipleChunks() { + let doSecondEnqueue: () => void; + let returnFromTransform: () => void; + const ts = new TransformStream({ + transform( + chunk: string, + controller: TransformStreamDefaultController + ): Promise { + delay(0).then(() => controller.enqueue(chunk.toUpperCase())); + doSecondEnqueue = (): void => controller.enqueue(chunk.toUpperCase()); + return new Promise((resolve) => { + returnFromTransform = resolve; + }); + }, + }); + + const reader = ts.readable.getReader(); + + const writer = ts.writable.getWriter(); + writer.write("a"); + + const result1 = await reader.read(); + assertEquals( + result1.value, + "A", + "the first chunk read is the transformation of the single chunk written" + ); + assert(!result1.done, "stream should not be done"); + doSecondEnqueue!(); + + const result2 = await reader.read(); + assertEquals( + result2.value, + "A", + "the second chunk read is also the transformation of the single chunk written" + ); + assert(!result2.done, "stream should not be done"); + returnFromTransform!(); +}); + +unitTest(function transformStreamClosingWriteClosesRead() { + const ts = new TransformStream({ transform(): void {} }); + + const writer = ts.writable.getWriter(); + writer.close(); + + return Promise.all([writer.closed, ts.readable.getReader().closed]).then( + undefined + ); +}); + +unitTest(async function transformStreamCloseWaitAwaitsTransforms() { + let transformResolve: () => void; + const transformPromise = new Promise((resolve) => { + transformResolve = resolve; + }); + const ts = new TransformStream( + { + transform(): Promise { + return transformPromise; + }, + }, + undefined, + { highWaterMark: 1 } + ); + + const writer = ts.writable.getWriter(); + writer.write("a"); + writer.close(); + + let rsClosed = false; + ts.readable.getReader().closed.then(() => { + rsClosed = true; + }); + + await delay(0); + assertEquals(rsClosed, false, "readable is not closed after a tick"); + transformResolve!(); + + await writer.closed; + // TODO: Is this expectation correct? + assertEquals(rsClosed, true, "readable is closed at that point"); +}); + +unitTest(async function transformStreamCloseWriteAfterSyncEnqueues() { + let c: TransformStreamDefaultController; + const ts = new TransformStream({ + start(controller: TransformStreamDefaultController): void { + c = controller; + }, + transform(): Promise { + c.enqueue("x"); + c.enqueue("y"); + return delay(0); + }, + }); + + const writer = ts.writable.getWriter(); + writer.write("a"); + writer.close(); + + const readableChunks = readableStreamToArray(ts.readable); + + await writer.closed; + const chunks = await readableChunks; + assertEquals( + chunks, + ["x", "y"], + "both enqueued chunks can be read from the readable" + ); +}); + +unitTest(async function transformStreamWritableCloseAsyncAfterAsyncEnqueues() { + let c: TransformStreamDefaultController; + const ts = new TransformStream({ + start(controller: TransformStreamDefaultController): void { + c = controller; + }, + transform(): Promise { + return delay(0) + .then(() => c.enqueue("x")) + .then(() => c.enqueue("y")) + .then(() => delay(0)); + }, + }); + + const writer = ts.writable.getWriter(); + writer.write("a"); + writer.close(); + + const readableChunks = readableStreamToArray(ts.readable); + + await writer.closed; + const chunks = await readableChunks; + assertEquals( + chunks, + ["x", "y"], + "both enqueued chunks can be read from the readable" + ); +}); + +unitTest(async function transformStreamTransformerMethodsCalledAsMethods() { + let c: TransformStreamDefaultController; + const transformer = { + suffix: "-suffix", + + start(controller: TransformStreamDefaultController): void { + c = controller; + c.enqueue("start" + this.suffix); + }, + + transform(chunk: string): void { + c.enqueue(chunk + this.suffix); + }, + + flush(): void { + c.enqueue("flushed" + this.suffix); + }, + }; + const ts = new TransformStream(transformer); + + const writer = ts.writable.getWriter(); + writer.write("a"); + writer.close(); + + const readableChunks = readableStreamToArray(ts.readable); + + await writer.closed; + const chunks = await readableChunks; + assertEquals( + chunks, + ["start-suffix", "a-suffix", "flushed-suffix"], + "all enqueued chunks have suffixes" + ); +}); + +unitTest(async function transformStreamMethodsShouldNotBeAppliedOrCalled() { + function functionWithOverloads(): void {} + functionWithOverloads.apply = (): void => { + throw new Error("apply() should not be called"); + }; + functionWithOverloads.call = (): void => { + throw new Error("call() should not be called"); + }; + const ts = new TransformStream({ + start: functionWithOverloads, + transform: functionWithOverloads, + flush: functionWithOverloads, + }); + const writer = ts.writable.getWriter(); + writer.write("a"); + writer.close(); + + await readableStreamToArray(ts.readable); +}); + +unitTest(async function transformStreamCallTransformSync() { + let transformCalled = false; + const ts = new TransformStream( + { + transform(): void { + transformCalled = true; + }, + }, + undefined, + { highWaterMark: Infinity } + ); + // transform() is only called synchronously when there is no backpressure and + // all microtasks have run. + await delay(0); + const writePromise = ts.writable.getWriter().write(undefined); + assert(transformCalled, "transform() should have been called"); + await writePromise; +}); + +unitTest(function transformStreamCloseWriteCloesesReadWithNoChunks() { + const ts = new TransformStream({}, undefined, { highWaterMark: 0 }); + + const writer = ts.writable.getWriter(); + writer.close(); + + return Promise.all([writer.closed, ts.readable.getReader().closed]).then( + undefined + ); +}); + +unitTest(function transformStreamEnqueueThrowsAfterTerminate() { + new TransformStream({ + start(controller: TransformStreamDefaultController): void { + controller.terminate(); + assertThrows(() => { + controller.enqueue(undefined); + }, TypeError); + }, + }); +}); + +unitTest(function transformStreamEnqueueThrowsAfterReadableCancel() { + let controller: TransformStreamDefaultController; + const ts = new TransformStream({ + start(c: TransformStreamDefaultController): void { + controller = c; + }, + }); + const cancelPromise = ts.readable.cancel(); + assertThrows( + () => controller.enqueue(undefined), + TypeError, + undefined, + "enqueue should throw" + ); + return cancelPromise; +}); + +unitTest(function transformStreamSecondTerminateNoOp() { + new TransformStream({ + start(controller: TransformStreamDefaultController): void { + controller.terminate(); + controller.terminate(); + }, + }); +}); + +unitTest(async function transformStreamTerminateAfterReadableCancelIsNoop() { + let controller: TransformStreamDefaultController; + const ts = new TransformStream({ + start(c: TransformStreamDefaultController): void { + controller = c; + }, + }); + const cancelReason = { name: "cancelReason" }; + const cancelPromise = ts.readable.cancel(cancelReason); + controller!.terminate(); + await cancelPromise; + try { + await ts.writable.getWriter().closed; + } catch (e) { + assert(e === cancelReason); + return; + } + throw new Error("closed should have rejected"); +}); + +unitTest(async function transformStreamStartCalledOnce() { + let calls = 0; + new TransformStream({ + start(): void { + ++calls; + }, + }); + await delay(0); + assertEquals(calls, 1, "start() should have been called exactly once"); +}); + +unitTest(function transformStreamReadableTypeThrows() { + assertThrows( + // eslint-disable-next-line + () => new TransformStream({ readableType: "bytes" as any }), + RangeError, + undefined, + "constructor should throw" + ); +}); + +unitTest(function transformStreamWirtableTypeThrows() { + assertThrows( + // eslint-disable-next-line + () => new TransformStream({ writableType: "bytes" as any }), + RangeError, + undefined, + "constructor should throw" + ); +}); + +unitTest(function transformStreamSubclassable() { + class Subclass extends TransformStream { + extraFunction(): boolean { + return true; + } + } + assert( + Object.getPrototypeOf(Subclass.prototype) === TransformStream.prototype, + "Subclass.prototype's prototype should be TransformStream.prototype" + ); + assert( + Object.getPrototypeOf(Subclass) === TransformStream, + "Subclass's prototype should be TransformStream" + ); + const sub = new Subclass(); + assert( + sub instanceof TransformStream, + "Subclass object should be an instance of TransformStream" + ); + assert( + sub instanceof Subclass, + "Subclass object should be an instance of Subclass" + ); + const readableGetter = Object.getOwnPropertyDescriptor( + TransformStream.prototype, + "readable" + )!.get; + assert( + readableGetter!.call(sub) === sub.readable, + "Subclass object should pass brand check" + ); + assert( + sub.extraFunction(), + "extraFunction() should be present on Subclass object" + ); +}); diff --git a/cli/js/tests/unit_tests.ts b/cli/js/tests/unit_tests.ts index 40d0124d35..7327bcc051 100644 --- a/cli/js/tests/unit_tests.ts +++ b/cli/js/tests/unit_tests.ts @@ -53,6 +53,7 @@ import "./resources_test.ts"; import "./signal_test.ts"; import "./stat_test.ts"; import "./streams_piping_test.ts"; +import "./streams_transform_test.ts"; import "./streams_writable_test.ts"; import "./symlink_test.ts"; import "./text_encoding_test.ts"; diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts index 846db096e6..5ef094afc6 100644 --- a/cli/js/web/streams/internals.ts +++ b/cli/js/web/streams/internals.ts @@ -13,6 +13,8 @@ import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_c import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts"; import { ReadableStreamImpl } from "./readable_stream.ts"; import * as sym from "./symbols.ts"; +import { TransformStreamImpl } from "./transform_stream.ts"; +import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts"; import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts"; import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts"; import { WritableStreamImpl } from "./writable_stream.ts"; @@ -36,10 +38,12 @@ type Container = { [sym.queue]: Array | BufferQueueItem>; [sym.queueTotalSize]: number; }; +export type FlushAlgorithm = () => Promise; export type Pair = { value: R; size: number }; export type PullAlgorithm = () => PromiseLike; export type SizeAlgorithm = (chunk: T) => number; export type StartAlgorithm = () => void | PromiseLike; +export type TransformAlgorithm = (chunk: I) => Promise; export type WriteAlgorithm = (chunk: W) => Promise; export interface Deferred { promise: Promise; @@ -76,8 +80,16 @@ export function acquireWritableStreamDefaultWriter( return new WritableStreamDefaultWriterImpl(stream); } +export function call any>( + fn: F, + v: ThisType, + args: Parameters +): ReturnType { + return Function.prototype.apply.call(fn, v, args); +} + function createAlgorithmFromUnderlyingMethod< - O extends UnderlyingByteSource | UnderlyingSource, + O extends UnderlyingByteSource | UnderlyingSource | Transformer, P extends keyof O >( underlyingObject: O, @@ -86,7 +98,7 @@ function createAlgorithmFromUnderlyingMethod< ...extraArgs: any[] ): () => Promise; function createAlgorithmFromUnderlyingMethod< - O extends UnderlyingByteSource | UnderlyingSource, + O extends UnderlyingByteSource | UnderlyingSource | Transformer, P extends keyof O >( underlyingObject: O, @@ -95,7 +107,7 @@ function createAlgorithmFromUnderlyingMethod< ...extraArgs: any[] ): (arg: any) => Promise; function createAlgorithmFromUnderlyingMethod< - O extends UnderlyingByteSource | UnderlyingSource, + O extends UnderlyingByteSource | UnderlyingSource | Transformer, P extends keyof O >( underlyingObject: O, @@ -110,11 +122,11 @@ function createAlgorithmFromUnderlyingMethod< } if (algoArgCount === 0) { return async (): Promise => - method.call(underlyingObject, ...extraArgs); + call(method, underlyingObject, extraArgs as any); } else { return async (arg: any): Promise => { const fullArgs = [arg, ...extraArgs]; - return method.call(underlyingObject, ...fullArgs); + return call(method, underlyingObject, fullArgs as any); }; } } @@ -148,6 +160,33 @@ function createReadableStream( return stream; } +function createWritableStream( + startAlgorithm: StartAlgorithm, + writeAlgorithm: WriteAlgorithm, + closeAlgorithm: CloseAlgorithm, + abortAlgorithm: AbortAlgorithm, + highWaterMark = 1, + sizeAlgorithm: SizeAlgorithm = (): number => 1 +): WritableStreamImpl { + assert(isNonNegativeNumber(highWaterMark)); + const stream = Object.create(WritableStreamImpl.prototype); + initializeWritableStream(stream); + const controller = Object.create( + WritableStreamDefaultControllerImpl.prototype + ); + setUpWritableStreamDefaultController( + stream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm + ); + return stream; +} + export function dequeueValue(container: Container): R { assert(sym.queue in container && sym.queueTotalSize in container); assert(container[sym.queue].length); @@ -185,13 +224,61 @@ export function getDeferred(): Required> { return { promise, resolve: resolve!, reject: reject! }; } -export function initializeReadableStream(stream: ReadableStreamImpl): void { +export function initializeReadableStream( + stream: ReadableStreamImpl +): void { stream[sym.state] = "readable"; stream[sym.reader] = stream[sym.storedError] = undefined; stream[sym.disturbed] = false; } -export function initializeWritableStream(stream: WritableStreamImpl): void { +export function initializeTransformStream( + stream: TransformStreamImpl, + startPromise: Promise, + writableHighWaterMark: number, + writableSizeAlgorithm: SizeAlgorithm, + readableHighWaterMark: number, + readableSizeAlgorithm: SizeAlgorithm +): void { + const startAlgorithm = (): Promise => startPromise; + const writeAlgorithm = (chunk: any): Promise => + transformStreamDefaultSinkWriteAlgorithm(stream, chunk); + const abortAlgorithm = (reason: any): Promise => + transformStreamDefaultSinkAbortAlgorithm(stream, reason); + const closeAlgorithm = (): Promise => + transformStreamDefaultSinkCloseAlgorithm(stream); + stream[sym.writable] = createWritableStream( + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + writableHighWaterMark, + writableSizeAlgorithm + ); + const pullAlgorithm = (): PromiseLike => + transformStreamDefaultSourcePullAlgorithm(stream); + const cancelAlgorithm = (reason: any): Promise => { + transformStreamErrorWritableAndUnblockWrite(stream, reason); + return Promise.resolve(undefined); + }; + stream[sym.readable] = createReadableStream( + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + readableHighWaterMark, + readableSizeAlgorithm + ); + stream[sym.backpressure] = stream[sym.backpressureChangePromise] = undefined; + transformStreamSetBackpressure(stream, true); + Object.defineProperty(stream, sym.transformStreamController, { + value: undefined, + configurable: true, + }); +} + +export function initializeWritableStream( + stream: WritableStreamImpl +): void { stream[sym.state] = "writable"; stream[sym.storedError] = stream[sym.writer] = stream[ sym.writableStreamController @@ -202,7 +289,7 @@ export function initializeWritableStream(stream: WritableStreamImpl): void { stream[sym.backpressure] = false; } -function invokeOrNoop( +export function invokeOrNoop, P extends keyof O>( o: O, p: P, ...args: Parameters @@ -212,7 +299,7 @@ function invokeOrNoop( if (!method) { return undefined; } - return method.call(o, ...args); + return call(method, o, args); } function isCallable(value: unknown): value is (...args: any) => any { @@ -299,6 +386,26 @@ export function isReadableStreamLocked(stream: ReadableStreamImpl): boolean { return stream[sym.reader] ? true : false; } +export function isTransformStream( + x: unknown +): x is TransformStreamImpl { + return typeof x !== "object" || + x === null || + !(sym.transformStreamController in x) + ? false + : true; +} + +export function isTransformStreamDefaultController( + x: unknown +): x is TransformStreamDefaultControllerImpl { + return typeof x !== "object" || + x === null || + !(sym.controlledTransformStream in x) + ? false + : true; +} + export function isUnderlyingByteSource( underlyingSource: UnderlyingByteSource | UnderlyingSource ): underlyingSource is UnderlyingByteSource { @@ -717,6 +824,14 @@ export function readableStreamDefaultControllerError( readableStreamError(stream, e); } +function readableStreamDefaultControllerHasBackpressure( + controller: ReadableStreamDefaultControllerImpl +): boolean { + return readableStreamDefaultControllerShouldCallPull(controller) + ? true + : false; +} + function readableStreamDefaultControllerShouldCallPull( controller: ReadableStreamDefaultControllerImpl ): boolean { @@ -1416,6 +1531,62 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource( ); } +function setUpTransformStreamDefaultController( + stream: TransformStreamImpl, + controller: TransformStreamDefaultControllerImpl, + transformAlgorithm: TransformAlgorithm, + flushAlgorithm: FlushAlgorithm +): void { + assert(isTransformStream(stream)); + assert(stream[sym.transformStreamController] === undefined); + controller[sym.controlledTransformStream] = stream; + stream[sym.transformStreamController] = controller; + controller[sym.transformAlgorithm] = transformAlgorithm; + controller[sym.flushAlgorithm] = flushAlgorithm; +} + +export function setUpTransformStreamDefaultControllerFromTransformer( + stream: TransformStreamImpl, + transformer: Transformer +): void { + assert(transformer); + const controller = Object.create( + TransformStreamDefaultControllerImpl.prototype + ) as TransformStreamDefaultControllerImpl; + let transformAlgorithm: TransformAlgorithm = (chunk) => { + try { + transformStreamDefaultControllerEnqueue( + controller, + // it defaults to no tranformation, so I is assumed to be O + (chunk as unknown) as O + ); + } catch (e) { + return Promise.reject(e); + } + return Promise.resolve(); + }; + const transformMethod = transformer.transform; + if (transformMethod) { + if (typeof transformMethod !== "function") { + throw new TypeError("tranformer.transform must be callable."); + } + transformAlgorithm = async (chunk): Promise => + call(transformMethod, transformer, [chunk, controller]); + } + const flushAlgorithm = createAlgorithmFromUnderlyingMethod( + transformer, + "flush", + 0, + controller + ); + setUpTransformStreamDefaultController( + stream, + controller, + transformAlgorithm, + flushAlgorithm + ); +} + function setUpWritableStreamDefaultController( stream: WritableStreamImpl, controller: WritableStreamDefaultControllerImpl, @@ -1508,6 +1679,181 @@ export function setUpWritableStreamDefaultControllerFromUnderlyingSink( ); } +function transformStreamDefaultControllerClearAlgorithms( + controller: TransformStreamDefaultControllerImpl +): void { + (controller as any)[sym.transformAlgorithm] = undefined; + (controller as any)[sym.flushAlgorithm] = undefined; +} + +export function transformStreamDefaultControllerEnqueue( + controller: TransformStreamDefaultControllerImpl, + chunk: O +): void { + const stream = controller[sym.controlledTransformStream]; + const readableController = stream[sym.readable][ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl; + if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) { + throw new TypeError( + "TransformStream's readable controller cannot be closed or enqueued." + ); + } + try { + readableStreamDefaultControllerEnqueue(readableController, chunk); + } catch (e) { + transformStreamErrorWritableAndUnblockWrite(stream, e); + throw stream[sym.readable][sym.storedError]; + } + const backpressure = readableStreamDefaultControllerHasBackpressure( + readableController + ); + if (backpressure) { + transformStreamSetBackpressure(stream, true); + } +} + +export function transformStreamDefaultControllerError( + controller: TransformStreamDefaultControllerImpl, + e: any +): void { + transformStreamError(controller[sym.controlledTransformStream], e); +} + +function transformStreamDefaultControllerPerformTransform( + controller: TransformStreamDefaultControllerImpl, + chunk: I +): Promise { + const transformPromise = controller[sym.transformAlgorithm](chunk); + return transformPromise.then(undefined, (r) => { + transformStreamError(controller[sym.controlledTransformStream], r); + throw r; + }); +} + +function transformStreamDefaultSinkAbortAlgorithm( + stream: TransformStreamImpl, + reason: any +): Promise { + transformStreamError(stream, reason); + return Promise.resolve(undefined); +} + +function transformStreamDefaultSinkCloseAlgorithm( + stream: TransformStreamImpl +): Promise { + const readable = stream[sym.readable]; + const controller = stream[sym.transformStreamController]; + const flushPromise = controller[sym.flushAlgorithm](); + transformStreamDefaultControllerClearAlgorithms(controller); + return flushPromise.then( + () => { + if (readable[sym.state] === "errored") { + throw readable[sym.storedError]; + } + const readableController = readable[ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl; + if ( + readableStreamDefaultControllerCanCloseOrEnqueue(readableController) + ) { + readableStreamDefaultControllerClose(readableController); + } + }, + (r) => { + transformStreamError(stream, r); + throw readable[sym.storedError]; + } + ); +} + +function transformStreamDefaultSinkWriteAlgorithm( + stream: TransformStreamImpl, + chunk: I +): Promise { + assert(stream[sym.writable][sym.state] === "writable"); + const controller = stream[sym.transformStreamController]; + if (stream[sym.backpressure]) { + const backpressureChangePromise = stream[sym.backpressureChangePromise]; + assert(backpressureChangePromise); + return backpressureChangePromise.promise.then(() => { + const writable = stream[sym.writable]; + const state = writable[sym.state]; + if (state === "erroring") { + throw writable[sym.storedError]; + } + assert(state === "writable"); + return transformStreamDefaultControllerPerformTransform( + controller, + chunk + ); + }); + } + return transformStreamDefaultControllerPerformTransform(controller, chunk); +} + +function transformStreamDefaultSourcePullAlgorithm( + stream: TransformStreamImpl +): Promise { + assert(stream[sym.backpressure] === true); + assert(stream[sym.backpressureChangePromise] !== undefined); + transformStreamSetBackpressure(stream, false); + return stream[sym.backpressureChangePromise]!.promise; +} + +function transformStreamError( + stream: TransformStreamImpl, + e: any +): void { + readableStreamDefaultControllerError( + stream[sym.readable][ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl, + e + ); + transformStreamErrorWritableAndUnblockWrite(stream, e); +} + +export function transformStreamDefaultControllerTerminate( + controller: TransformStreamDefaultControllerImpl +): void { + const stream = controller[sym.controlledTransformStream]; + const readableController = stream[sym.readable][ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl; + readableStreamDefaultControllerClose(readableController); + const error = new TypeError("TransformStream is closed."); + transformStreamErrorWritableAndUnblockWrite(stream, error); +} + +function transformStreamErrorWritableAndUnblockWrite( + stream: TransformStreamImpl, + e: any +): void { + transformStreamDefaultControllerClearAlgorithms( + stream[sym.transformStreamController] + ); + writableStreamDefaultControllerErrorIfNeeded( + stream[sym.writable][sym.writableStreamController]!, + e + ); + if (stream[sym.backpressure]) { + transformStreamSetBackpressure(stream, false); + } +} + +function transformStreamSetBackpressure( + stream: TransformStreamImpl, + backpressure: boolean +): void { + assert(stream[sym.backpressure] !== backpressure); + if (stream[sym.backpressureChangePromise] !== undefined) { + stream[sym.backpressureChangePromise]!.resolve!(undefined); + } + stream[sym.backpressureChangePromise] = getDeferred(); + stream[sym.backpressure] = backpressure; +} + function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer { assert(!isDetachedBuffer(buffer)); const transferredIshVersion = buffer.slice(0); diff --git a/cli/js/web/streams/symbols.ts b/cli/js/web/streams/symbols.ts index 9e5cb57157..9c0a336e55 100644 --- a/cli/js/web/streams/symbols.ts +++ b/cli/js/web/streams/symbols.ts @@ -11,6 +11,7 @@ export const abortSteps = Symbol("abortSteps"); export const asyncIteratorReader = Symbol("asyncIteratorReader"); export const autoAllocateChunkSize = Symbol("autoAllocateChunkSize"); export const backpressure = Symbol("backpressure"); +export const backpressureChangePromise = Symbol("backpressureChangePromise"); export const byobRequest = Symbol("byobRequest"); export const cancelAlgorithm = Symbol("cancelAlgorithm"); export const cancelSteps = Symbol("cancelSteps"); @@ -22,9 +23,11 @@ export const controlledReadableByteStream = Symbol( "controlledReadableByteStream" ); export const controlledReadableStream = Symbol("controlledReadableStream"); +export const controlledTransformStream = Symbol("controlledTransformStream"); export const controlledWritableStream = Symbol("controlledWritableStream"); export const disturbed = Symbol("disturbed"); export const errorSteps = Symbol("errorSteps"); +export const flushAlgorithm = Symbol("flushAlgorithm"); export const forAuthorCode = Symbol("forAuthorCode"); export const inFlightWriteRequest = Symbol("inFlightWriteRequest"); export const inFlightCloseRequest = Symbol("inFlightCloseRequest"); @@ -39,6 +42,7 @@ export const pulling = Symbol("pulling"); export const pullSteps = Symbol("pullSteps"); export const queue = Symbol("queue"); export const queueTotalSize = Symbol("queueTotalSize"); +export const readable = Symbol("readable"); export const readableStreamController = Symbol("readableStreamController"); export const reader = Symbol("reader"); export const readRequests = Symbol("readRequests"); @@ -48,7 +52,10 @@ export const state = Symbol("state"); export const storedError = Symbol("storedError"); export const strategyHWM = Symbol("strategyHWM"); export const strategySizeAlgorithm = Symbol("strategySizeAlgorithm"); +export const transformAlgorithm = Symbol("transformAlgorithm"); +export const transformStreamController = Symbol("transformStreamController"); export const writableStreamController = Symbol("writableStreamController"); export const writeAlgorithm = Symbol("writeAlgorithm"); +export const writable = Symbol("writable"); export const writer = Symbol("writer"); export const writeRequests = Symbol("writeRequests"); diff --git a/cli/js/web/streams/transform_stream.ts b/cli/js/web/streams/transform_stream.ts new file mode 100644 index 0000000000..ac08fea3fa --- /dev/null +++ b/cli/js/web/streams/transform_stream.ts @@ -0,0 +1,118 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +import { + Deferred, + getDeferred, + initializeTransformStream, + invokeOrNoop, + isTransformStream, + makeSizeAlgorithmFromSizeFunction, + setFunctionName, + setUpTransformStreamDefaultControllerFromTransformer, + validateAndNormalizeHighWaterMark, +} from "./internals.ts"; +import { ReadableStreamImpl } from "./readable_stream.ts"; +import * as sym from "./symbols.ts"; +import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts"; +import { WritableStreamImpl } from "./writable_stream.ts"; +import { customInspect, inspect } from "../console.ts"; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export class TransformStreamImpl + implements TransformStream { + [sym.backpressure]?: boolean; + [sym.backpressureChangePromise]?: Deferred; + [sym.readable]: ReadableStreamImpl; + [sym.transformStreamController]: TransformStreamDefaultControllerImpl; + [sym.writable]: WritableStreamImpl; + + constructor( + transformer: Transformer = {}, + writableStrategy: QueuingStrategy = {}, + readableStrategy: QueuingStrategy = {} + ) { + const writableSizeFunction = writableStrategy.size; + let writableHighWaterMark = writableStrategy.highWaterMark; + const readableSizeFunction = readableStrategy.size; + let readableHighWaterMark = readableStrategy.highWaterMark; + const writableType = transformer.writableType; + if (writableType !== undefined) { + throw new RangeError( + `Expected transformer writableType to be undefined, received "${String( + writableType + )}"` + ); + } + const writableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction( + writableSizeFunction + ); + if (writableHighWaterMark === undefined) { + writableHighWaterMark = 1; + } + writableHighWaterMark = validateAndNormalizeHighWaterMark( + writableHighWaterMark + ); + const readableType = transformer.readableType; + if (readableType !== undefined) { + throw new RangeError( + `Expected transformer readableType to be undefined, received "${String( + readableType + )}"` + ); + } + const readableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction( + readableSizeFunction + ); + if (readableHighWaterMark === undefined) { + readableHighWaterMark = 1; + } + readableHighWaterMark = validateAndNormalizeHighWaterMark( + readableHighWaterMark + ); + const startPromise = getDeferred(); + initializeTransformStream( + this, + startPromise.promise, + writableHighWaterMark, + writableSizeAlgorithm, + readableHighWaterMark, + readableSizeAlgorithm + ); + // the brand check expects this, and the brand check occurs in the following + // but the property hasn't been defined. + Object.defineProperty(this, sym.transformStreamController, { + value: undefined, + writable: true, + configurable: true, + }); + setUpTransformStreamDefaultControllerFromTransformer(this, transformer); + const startResult: void | PromiseLike = invokeOrNoop( + transformer, + "start", + this[sym.transformStreamController] + ); + startPromise.resolve(startResult); + } + + get readable(): ReadableStream { + if (!isTransformStream(this)) { + throw new TypeError("Invalid TransformStream."); + } + return this[sym.readable]; + } + + get writable(): WritableStream { + if (!isTransformStream(this)) { + throw new TypeError("Invalid TransformStream."); + } + return this[sym.writable]; + } + + [customInspect](): string { + return `${this.constructor.name} {\n readable: ${inspect( + this.readable + )}\n writable: ${inspect(this.writable)}\n}`; + } +} + +setFunctionName(TransformStreamImpl, "TransformStream"); diff --git a/cli/js/web/streams/transform_stream_default_controller.ts b/cli/js/web/streams/transform_stream_default_controller.ts new file mode 100644 index 0000000000..2fc8d21600 --- /dev/null +++ b/cli/js/web/streams/transform_stream_default_controller.ts @@ -0,0 +1,75 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +import { + FlushAlgorithm, + isTransformStreamDefaultController, + readableStreamDefaultControllerGetDesiredSize, + setFunctionName, + TransformAlgorithm, + transformStreamDefaultControllerEnqueue, + transformStreamDefaultControllerError, + transformStreamDefaultControllerTerminate, +} from "./internals.ts"; +import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts"; +import * as sym from "./symbols.ts"; +import { TransformStreamImpl } from "./transform_stream.ts"; +import { customInspect } from "../console.ts"; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export class TransformStreamDefaultControllerImpl + implements TransformStreamDefaultController { + [sym.controlledTransformStream]: TransformStreamImpl; + [sym.flushAlgorithm]: FlushAlgorithm; + [sym.transformAlgorithm]: TransformAlgorithm; + + private constructor() { + throw new TypeError( + "TransformStreamDefaultController's constructor cannot be called." + ); + } + + get desiredSize(): number | null { + if (!isTransformStreamDefaultController(this)) { + throw new TypeError("Invalid TransformStreamDefaultController."); + } + const readableController = this[sym.controlledTransformStream][ + sym.readable + ][sym.readableStreamController]; + return readableStreamDefaultControllerGetDesiredSize( + readableController as ReadableStreamDefaultControllerImpl + ); + } + + enqueue(chunk: O): void { + if (!isTransformStreamDefaultController(this)) { + throw new TypeError("Invalid TransformStreamDefaultController."); + } + transformStreamDefaultControllerEnqueue(this, chunk); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + error(reason?: any): void { + if (!isTransformStreamDefaultController(this)) { + throw new TypeError("Invalid TransformStreamDefaultController."); + } + transformStreamDefaultControllerError(this, reason); + } + + terminate(): void { + if (!isTransformStreamDefaultController(this)) { + throw new TypeError("Invalid TransformStreamDefaultController."); + } + transformStreamDefaultControllerTerminate(this); + } + + [customInspect](): string { + return `${this.constructor.name} { desiredSize: ${String( + this.desiredSize + )} }`; + } +} + +setFunctionName( + TransformStreamDefaultControllerImpl, + "TransformStreamDefaultController" +);