// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // This code closely follows the WHATWG Stream Specification // See: https://streams.spec.whatwg.org/ // // There are some parts that are not fully implemented, and there are some // comments which point to steps of the specification that are not implemented. /* eslint-disable @typescript-eslint/no-explicit-any,require-await */ import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_controller.ts"; import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts"; import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts"; import { ReadableStreamImpl } from "./readable_stream.ts"; import * as sym from "./symbols.ts"; import type { TransformStreamImpl } from "./transform_stream.ts"; import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts"; import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts"; import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts"; import { WritableStreamImpl } from "./writable_stream.ts"; import { AbortSignalImpl } from "../abort_signal.ts"; import { DOMExceptionImpl as DOMException } from "../dom_exception.ts"; import { cloneValue, setFunctionName } from "../util.ts"; import { assert, AssertionError } from "../../util.ts"; export type AbortAlgorithm = (reason?: any) => PromiseLike; export interface AbortRequest { promise: Deferred; reason?: any; wasAlreadyErroring: boolean; } export interface BufferQueueItem extends Pair { offset: number; } export type CancelAlgorithm = (reason?: any) => PromiseLike; export type CloseAlgorithm = () => PromiseLike; type Container = { [sym.queue]: Array | BufferQueueItem>; [sym.queueTotalSize]: number; }; export type FlushAlgorithm = () => Promise; export type Pair = { value: R; size: number }; export type PullAlgorithm = () => PromiseLike; export type SizeAlgorithm = (chunk: T) => number; export type StartAlgorithm = () => void | PromiseLike; export type TransformAlgorithm = (chunk: I) => Promise; export type WriteAlgorithm = (chunk: W) => Promise; export interface Deferred { promise: Promise; resolve?: (value?: T | PromiseLike) => void; reject?: (reason?: any) => void; } export interface ReadableStreamGenericReader extends ReadableStreamReader { [sym.closedPromise]: Deferred; [sym.forAuthorCode]: boolean; [sym.ownerReadableStream]: ReadableStreamImpl; [sym.readRequests]: Array>>; } export interface ReadableStreamAsyncIterator extends AsyncIterator { [sym.asyncIteratorReader]: ReadableStreamDefaultReaderImpl; [sym.preventCancel]: boolean; return(value?: any | PromiseLike): Promise>; } export function acquireReadableStreamDefaultReader( stream: ReadableStreamImpl, forAuthorCode = false ): ReadableStreamDefaultReaderImpl { const reader = new ReadableStreamDefaultReaderImpl(stream); reader[sym.forAuthorCode] = forAuthorCode; return reader; } export function acquireWritableStreamDefaultWriter( stream: WritableStreamImpl ): WritableStreamDefaultWriterImpl { return new WritableStreamDefaultWriterImpl(stream); } export function call any>( fn: F, v: ThisType, args: Parameters ): ReturnType { return Function.prototype.apply.call(fn, v, args); } function createAlgorithmFromUnderlyingMethod< O extends UnderlyingByteSource | UnderlyingSource | Transformer, P extends keyof O >( underlyingObject: O, methodName: P, algoArgCount: 0, ...extraArgs: any[] ): () => Promise; function createAlgorithmFromUnderlyingMethod< O extends UnderlyingByteSource | UnderlyingSource | Transformer, P extends keyof O >( underlyingObject: O, methodName: P, algoArgCount: 1, ...extraArgs: any[] ): (arg: any) => Promise; function createAlgorithmFromUnderlyingMethod< O extends UnderlyingByteSource | UnderlyingSource | Transformer, P extends keyof O >( underlyingObject: O, methodName: P, algoArgCount: 0 | 1, ...extraArgs: any[] ): (() => Promise) | ((arg: any) => Promise) { const method = underlyingObject[methodName]; if (method) { if (!isCallable(method)) { throw new TypeError("method is not callable"); } if (algoArgCount === 0) { return async (): Promise => call(method, underlyingObject, extraArgs as any); } else { return async (arg: any): Promise => { const fullArgs = [arg, ...extraArgs]; return call(method, underlyingObject, fullArgs as any); }; } } return async (): Promise => undefined; } function createReadableStream( startAlgorithm: StartAlgorithm, pullAlgorithm: PullAlgorithm, cancelAlgorithm: CancelAlgorithm, highWaterMark = 1, sizeAlgorithm: SizeAlgorithm = (): number => 1 ): ReadableStreamImpl { highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark); const stream: ReadableStreamImpl = Object.create( ReadableStreamImpl.prototype ); initializeReadableStream(stream); const controller: ReadableStreamDefaultControllerImpl = Object.create( ReadableStreamDefaultControllerImpl.prototype ); setUpReadableStreamDefaultController( stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm ); return stream; } function createWritableStream( startAlgorithm: StartAlgorithm, writeAlgorithm: WriteAlgorithm, closeAlgorithm: CloseAlgorithm, abortAlgorithm: AbortAlgorithm, highWaterMark = 1, sizeAlgorithm: SizeAlgorithm = (): number => 1 ): WritableStreamImpl { highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark); const stream = Object.create(WritableStreamImpl.prototype); initializeWritableStream(stream); const controller = Object.create( WritableStreamDefaultControllerImpl.prototype ); setUpWritableStreamDefaultController( stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm ); return stream; } export function dequeueValue(container: Container): R { assert(sym.queue in container && sym.queueTotalSize in container); assert(container[sym.queue].length); const pair = container[sym.queue].shift()!; container[sym.queueTotalSize] -= pair.size; if (container[sym.queueTotalSize] <= 0) { container[sym.queueTotalSize] = 0; } return pair.value as R; } function enqueueValueWithSize( container: Container, value: R, size: number ): void { assert(sym.queue in container && sym.queueTotalSize in container); size = Number(size); if (!isFiniteNonNegativeNumber(size)) { throw new RangeError("size must be a finite non-negative number."); } container[sym.queue].push({ value, size }); container[sym.queueTotalSize] += size; } /** Non-spec mechanism to "unwrap" a promise and store it to be resolved * later. */ export function getDeferred(): Required> { let resolve: (value?: T | PromiseLike) => void; let reject: (reason?: any) => void; const promise = new Promise((res, rej) => { resolve = res; reject = rej; }); return { promise, resolve: resolve!, reject: reject! }; } export function initializeReadableStream( stream: ReadableStreamImpl ): void { stream[sym.state] = "readable"; stream[sym.reader] = stream[sym.storedError] = undefined; stream[sym.disturbed] = false; } export function initializeTransformStream( stream: TransformStreamImpl, startPromise: Promise, writableHighWaterMark: number, writableSizeAlgorithm: SizeAlgorithm, readableHighWaterMark: number, readableSizeAlgorithm: SizeAlgorithm ): void { const startAlgorithm = (): Promise => startPromise; const writeAlgorithm = (chunk: any): Promise => transformStreamDefaultSinkWriteAlgorithm(stream, chunk); const abortAlgorithm = (reason: any): Promise => transformStreamDefaultSinkAbortAlgorithm(stream, reason); const closeAlgorithm = (): Promise => transformStreamDefaultSinkCloseAlgorithm(stream); stream[sym.writable] = createWritableStream( startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm ); const pullAlgorithm = (): PromiseLike => transformStreamDefaultSourcePullAlgorithm(stream); const cancelAlgorithm = (reason: any): Promise => { transformStreamErrorWritableAndUnblockWrite(stream, reason); return Promise.resolve(undefined); }; stream[sym.readable] = createReadableStream( startAlgorithm, pullAlgorithm, cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm ); stream[sym.backpressure] = stream[sym.backpressureChangePromise] = undefined; transformStreamSetBackpressure(stream, true); Object.defineProperty(stream, sym.transformStreamController, { value: undefined, configurable: true, }); } export function initializeWritableStream( stream: WritableStreamImpl ): void { stream[sym.state] = "writable"; stream[sym.storedError] = stream[sym.writer] = stream[ sym.writableStreamController ] = stream[sym.inFlightWriteRequest] = stream[sym.closeRequest] = stream[ sym.inFlightCloseRequest ] = stream[sym.pendingAbortRequest] = undefined; stream[sym.writeRequests] = []; stream[sym.backpressure] = false; } export function invokeOrNoop, P extends keyof O>( o: O, p: P, ...args: Parameters ): ReturnType | undefined { assert(o); const method = o[p]; if (!method) { return undefined; } return call(method, o, args); } function isCallable(value: unknown): value is (...args: any) => any { return typeof value === "function"; } export function isDetachedBuffer(value: object): boolean { return sym.isFakeDetached in value; } function isFiniteNonNegativeNumber(v: unknown): v is number { return Number.isFinite(v) && (v as number) >= 0; } export function isReadableByteStreamController( x: unknown ): x is ReadableByteStreamControllerImpl { return !( typeof x !== "object" || x === null || !(sym.controlledReadableByteStream in x) ); } export function isReadableStream(x: unknown): x is ReadableStreamImpl { return !( typeof x !== "object" || x === null || !(sym.readableStreamController in x) ); } export function isReadableStreamAsyncIterator( x: unknown ): x is ReadableStreamAsyncIterator { if (typeof x !== "object" || x === null) { return false; } return sym.asyncIteratorReader in x; } export function isReadableStreamDefaultController( x: unknown ): x is ReadableStreamDefaultControllerImpl { return !( typeof x !== "object" || x === null || !(sym.controlledReadableStream in x) ); } export function isReadableStreamDefaultReader( x: unknown ): x is ReadableStreamDefaultReaderImpl { return !(typeof x !== "object" || x === null || !(sym.readRequests in x)); } export function isReadableStreamLocked(stream: ReadableStreamImpl): boolean { assert(isReadableStream(stream)); return !!stream[sym.reader]; } export function isReadableStreamDisturbed(stream: ReadableStream): boolean { assert(isReadableStream(stream)); return !!stream[sym.disturbed]; } export function isTransformStream(x: unknown): x is TransformStreamImpl { return !( typeof x !== "object" || x === null || !(sym.transformStreamController in x) ); } export function isTransformStreamDefaultController( x: unknown ): x is TransformStreamDefaultControllerImpl { return !( typeof x !== "object" || x === null || !(sym.controlledTransformStream in x) ); } export function isUnderlyingByteSource( underlyingSource: UnderlyingByteSource | UnderlyingSource ): underlyingSource is UnderlyingByteSource { const { type } = underlyingSource; const typeString = String(type); return typeString === "bytes"; } export function isWritableStream(x: unknown): x is WritableStreamImpl { return !( typeof x !== "object" || x === null || !(sym.writableStreamController in x) ); } export function isWritableStreamDefaultController( x: unknown ): x is WritableStreamDefaultControllerImpl { return !( typeof x !== "object" || x === null || !(sym.controlledWritableStream in x) ); } export function isWritableStreamDefaultWriter( x: unknown ): x is WritableStreamDefaultWriterImpl { return !( typeof x !== "object" || x === null || !(sym.ownerWritableStream in x) ); } export function isWritableStreamLocked(stream: WritableStreamImpl): boolean { assert(isWritableStream(stream)); return stream[sym.writer] !== undefined; } export function makeSizeAlgorithmFromSizeFunction( size: QueuingStrategySizeCallback | undefined ): SizeAlgorithm { if (size === undefined) { return (): number => 1; } if (typeof size !== "function") { throw new TypeError("size must be callable."); } return (chunk: T): number => { return size.call(undefined, chunk); }; } function peekQueueValue(container: Container): T | "close" { assert(sym.queue in container && sym.queueTotalSize in container); assert(container[sym.queue].length); const [pair] = container[sym.queue]; return pair.value as T; } function readableByteStreamControllerShouldCallPull( controller: ReadableByteStreamControllerImpl ): boolean { const stream = controller[sym.controlledReadableByteStream]; if ( stream[sym.state] !== "readable" || controller[sym.closeRequested] || !controller[sym.started] ) { return false; } if ( readableStreamHasDefaultReader(stream) && readableStreamGetNumReadRequests(stream) > 0 ) { return true; } // 3.13.25.6 If ! ReadableStreamHasBYOBReader(stream) is true and ! // ReadableStreamGetNumReadIntoRequests(stream) > 0, return true. const desiredSize = readableByteStreamControllerGetDesiredSize(controller); assert(desiredSize !== null); return desiredSize > 0; } export function readableByteStreamControllerCallPullIfNeeded( controller: ReadableByteStreamControllerImpl ): void { const shouldPull = readableByteStreamControllerShouldCallPull(controller); if (!shouldPull) { return; } if (controller[sym.pulling]) { controller[sym.pullAgain] = true; return; } assert(controller[sym.pullAgain] === false); controller[sym.pulling] = true; const pullPromise = controller[sym.pullAlgorithm](); setPromiseIsHandledToTrue( pullPromise.then( () => { controller[sym.pulling] = false; if (controller[sym.pullAgain]) { controller[sym.pullAgain] = false; readableByteStreamControllerCallPullIfNeeded(controller); } }, (e) => { readableByteStreamControllerError(controller, e); } ) ); } export function readableByteStreamControllerClearAlgorithms( controller: ReadableByteStreamControllerImpl ): void { (controller as any)[sym.pullAlgorithm] = undefined; (controller as any)[sym.cancelAlgorithm] = undefined; } export function readableByteStreamControllerClose( controller: ReadableByteStreamControllerImpl ): void { const stream = controller[sym.controlledReadableByteStream]; if (controller[sym.closeRequested] || stream[sym.state] !== "readable") { return; } if (controller[sym.queueTotalSize] > 0) { controller[sym.closeRequested] = true; return; } // 3.13.6.4 If controller.[[pendingPullIntos]] is not empty, (BYOB Support) readableByteStreamControllerClearAlgorithms(controller); readableStreamClose(stream); } export function readableByteStreamControllerEnqueue( controller: ReadableByteStreamControllerImpl, chunk: ArrayBufferView ): void { const stream = controller[sym.controlledReadableByteStream]; if (controller[sym.closeRequested] || stream[sym.state] !== "readable") { return; } const { buffer, byteOffset, byteLength } = chunk; const transferredBuffer = transferArrayBuffer(buffer); if (readableStreamHasDefaultReader(stream)) { if (readableStreamGetNumReadRequests(stream) === 0) { readableByteStreamControllerEnqueueChunkToQueue( controller, transferredBuffer, byteOffset, byteLength ); } else { assert(controller[sym.queue].length === 0); const transferredView = new Uint8Array( transferredBuffer, byteOffset, byteLength ); readableStreamFulfillReadRequest(stream, transferredView, false); } // 3.13.9.8 Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true } else { assert(!isReadableStreamLocked(stream)); readableByteStreamControllerEnqueueChunkToQueue( controller, transferredBuffer, byteOffset, byteLength ); } readableByteStreamControllerCallPullIfNeeded(controller); } function readableByteStreamControllerEnqueueChunkToQueue( controller: ReadableByteStreamControllerImpl, buffer: ArrayBuffer | SharedArrayBuffer, byteOffset: number, byteLength: number ): void { controller[sym.queue].push({ value: buffer, offset: byteOffset, size: byteLength, }); controller[sym.queueTotalSize] += byteLength; } export function readableByteStreamControllerError( controller: ReadableByteStreamControllerImpl, e: any ): void { const stream = controller[sym.controlledReadableByteStream]; if (stream[sym.state] !== "readable") { return; } // 3.13.11.3 Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller). resetQueue(controller); readableByteStreamControllerClearAlgorithms(controller); readableStreamError(stream, e); } export function readableByteStreamControllerGetDesiredSize( controller: ReadableByteStreamControllerImpl ): number | null { const stream = controller[sym.controlledReadableByteStream]; const state = stream[sym.state]; if (state === "errored") { return null; } if (state === "closed") { return 0; } return controller[sym.strategyHWM] - controller[sym.queueTotalSize]; } export function readableByteStreamControllerHandleQueueDrain( controller: ReadableByteStreamControllerImpl ): void { assert( controller[sym.controlledReadableByteStream][sym.state] === "readable" ); if (controller[sym.queueTotalSize] === 0 && controller[sym.closeRequested]) { readableByteStreamControllerClearAlgorithms(controller); readableStreamClose(controller[sym.controlledReadableByteStream]); } else { readableByteStreamControllerCallPullIfNeeded(controller); } } export function readableStreamAddReadRequest( stream: ReadableStreamImpl ): Promise> { assert(isReadableStreamDefaultReader(stream[sym.reader])); assert(stream[sym.state] === "readable"); const promise = getDeferred>(); stream[sym.reader]![sym.readRequests].push(promise); return promise.promise; } export function readableStreamCancel( stream: ReadableStreamImpl, reason: any ): Promise { stream[sym.disturbed] = true; if (stream[sym.state] === "closed") { return Promise.resolve(); } if (stream[sym.state] === "errored") { return Promise.reject(stream[sym.storedError]); } readableStreamClose(stream); return stream[sym.readableStreamController]![sym.cancelSteps](reason).then( () => undefined ) as Promise; } export function readableStreamClose(stream: ReadableStreamImpl): void { assert(stream[sym.state] === "readable"); stream[sym.state] = "closed"; const reader = stream[sym.reader]; if (!reader) { return; } if (isReadableStreamDefaultReader(reader)) { for (const readRequest of reader[sym.readRequests]) { assert(readRequest.resolve); readRequest.resolve( readableStreamCreateReadResult( undefined, true, reader[sym.forAuthorCode] ) ); } reader[sym.readRequests] = []; } const resolve = reader[sym.closedPromise].resolve; assert(resolve); resolve(); } export function readableStreamCreateReadResult( value: T | undefined, done: boolean, forAuthorCode: boolean ): ReadableStreamReadResult { const prototype = forAuthorCode ? Object.prototype : null; assert(typeof done === "boolean"); const obj: ReadableStreamReadResult = Object.create(prototype); Object.defineProperties(obj, { value: { value, writable: true, enumerable: true, configurable: true }, done: { value: done, writable: true, enumerable: true, configurable: true }, }); return obj; } export function readableStreamDefaultControllerCallPullIfNeeded( controller: ReadableStreamDefaultControllerImpl ): void { const shouldPull = readableStreamDefaultControllerShouldCallPull(controller); if (!shouldPull) { return; } if (controller[sym.pulling]) { controller[sym.pullAgain] = true; return; } assert(controller[sym.pullAgain] === false); controller[sym.pulling] = true; const pullPromise = controller[sym.pullAlgorithm](); pullPromise.then( () => { controller[sym.pulling] = false; if (controller[sym.pullAgain]) { controller[sym.pullAgain] = false; readableStreamDefaultControllerCallPullIfNeeded(controller); } }, (e) => { readableStreamDefaultControllerError(controller, e); } ); } export function readableStreamDefaultControllerCanCloseOrEnqueue( controller: ReadableStreamDefaultControllerImpl ): boolean { const state = controller[sym.controlledReadableStream][sym.state]; return !controller[sym.closeRequested] && state === "readable"; } export function readableStreamDefaultControllerClearAlgorithms( controller: ReadableStreamDefaultControllerImpl ): void { (controller as any)[sym.pullAlgorithm] = undefined; (controller as any)[sym.cancelAlgorithm] = undefined; (controller as any)[sym.strategySizeAlgorithm] = undefined; } export function readableStreamDefaultControllerClose( controller: ReadableStreamDefaultControllerImpl ): void { if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) { return; } const stream = controller[sym.controlledReadableStream]; controller[sym.closeRequested] = true; if (controller[sym.queue].length === 0) { readableStreamDefaultControllerClearAlgorithms(controller); readableStreamClose(stream); } } export function readableStreamDefaultControllerEnqueue( controller: ReadableStreamDefaultControllerImpl, chunk: T ): void { if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) { return; } const stream = controller[sym.controlledReadableStream]; if ( isReadableStreamLocked(stream) && readableStreamGetNumReadRequests(stream) > 0 ) { readableStreamFulfillReadRequest(stream, chunk, false); } else { try { const chunkSize = controller[sym.strategySizeAlgorithm](chunk); enqueueValueWithSize(controller, chunk, chunkSize); } catch (err) { readableStreamDefaultControllerError(controller, err); throw err; } } readableStreamDefaultControllerCallPullIfNeeded(controller); } export function readableStreamDefaultControllerGetDesiredSize( controller: ReadableStreamDefaultControllerImpl ): number | null { const stream = controller[sym.controlledReadableStream]; const state = stream[sym.state]; if (state === "errored") { return null; } if (state === "closed") { return 0; } return controller[sym.strategyHWM] - controller[sym.queueTotalSize]; } export function readableStreamDefaultControllerError( controller: ReadableStreamDefaultControllerImpl, e: any ): void { const stream = controller[sym.controlledReadableStream]; if (stream[sym.state] !== "readable") { return; } resetQueue(controller); readableStreamDefaultControllerClearAlgorithms(controller); readableStreamError(stream, e); } function readableStreamDefaultControllerHasBackpressure( controller: ReadableStreamDefaultControllerImpl ): boolean { return readableStreamDefaultControllerShouldCallPull(controller); } function readableStreamDefaultControllerShouldCallPull( controller: ReadableStreamDefaultControllerImpl ): boolean { const stream = controller[sym.controlledReadableStream]; if ( !readableStreamDefaultControllerCanCloseOrEnqueue(controller) || controller[sym.started] === false ) { return false; } if ( isReadableStreamLocked(stream) && readableStreamGetNumReadRequests(stream) > 0 ) { return true; } const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller); assert(desiredSize !== null); return desiredSize > 0; } export function readableStreamDefaultReaderRead( reader: ReadableStreamDefaultReaderImpl ): Promise> { const stream = reader[sym.ownerReadableStream]; assert(stream); stream[sym.disturbed] = true; if (stream[sym.state] === "closed") { return Promise.resolve( readableStreamCreateReadResult( undefined, true, reader[sym.forAuthorCode] ) ); } if (stream[sym.state] === "errored") { return Promise.reject(stream[sym.storedError]); } assert(stream[sym.state] === "readable"); return (stream[ sym.readableStreamController ] as ReadableStreamDefaultControllerImpl)[sym.pullSteps](); } export function readableStreamError(stream: ReadableStreamImpl, e: any): void { assert(isReadableStream(stream)); assert(stream[sym.state] === "readable"); stream[sym.state] = "errored"; stream[sym.storedError] = e; const reader = stream[sym.reader]; if (reader === undefined) { return; } if (isReadableStreamDefaultReader(reader)) { for (const readRequest of reader[sym.readRequests]) { assert(readRequest.reject); readRequest.reject(e); readRequest.reject = undefined; readRequest.resolve = undefined; } reader[sym.readRequests] = []; } // 3.5.6.8 Otherwise, support BYOB Reader reader[sym.closedPromise].reject!(e); reader[sym.closedPromise].reject = undefined; reader[sym.closedPromise].resolve = undefined; setPromiseIsHandledToTrue(reader[sym.closedPromise].promise); } export function readableStreamFulfillReadRequest( stream: ReadableStreamImpl, chunk: R, done: boolean ): void { const reader = stream[sym.reader]!; const readRequest = reader[sym.readRequests].shift()!; assert(readRequest.resolve); readRequest.resolve( readableStreamCreateReadResult(chunk, done, reader[sym.forAuthorCode]) ); } export function readableStreamGetNumReadRequests( stream: ReadableStreamImpl ): number { return stream[sym.reader]?.[sym.readRequests].length ?? 0; } export function readableStreamHasDefaultReader( stream: ReadableStreamImpl ): boolean { const reader = stream[sym.reader]; return !(reader === undefined || !isReadableStreamDefaultReader(reader)); } export function readableStreamPipeTo( source: ReadableStreamImpl, dest: WritableStreamImpl, preventClose: boolean, preventAbort: boolean, preventCancel: boolean, signal: AbortSignalImpl | undefined ): Promise { assert(isReadableStream(source)); assert(isWritableStream(dest)); assert( typeof preventClose === "boolean" && typeof preventAbort === "boolean" && typeof preventCancel === "boolean" ); assert(signal === undefined || signal instanceof AbortSignalImpl); assert(!isReadableStreamLocked(source)); assert(!isWritableStreamLocked(dest)); const reader = acquireReadableStreamDefaultReader(source); const writer = acquireWritableStreamDefaultWriter(dest); source[sym.disturbed] = true; let shuttingDown = false; const promise = getDeferred(); let abortAlgorithm: () => void; if (signal) { abortAlgorithm = (): void => { const error = new DOMException("Abort signal received.", "AbortSignal"); const actions: Array<() => Promise> = []; if (!preventAbort) { actions.push(() => { if (dest[sym.state] === "writable") { return writableStreamAbort(dest, error); } else { return Promise.resolve(undefined); } }); } if (!preventCancel) { actions.push(() => { if (source[sym.state] === "readable") { return readableStreamCancel(source, error); } else { return Promise.resolve(undefined); } }); } shutdownWithAction( () => Promise.all(actions.map((action) => action())), true, error ); }; if (signal.aborted) { abortAlgorithm(); return promise.promise; } signal.addEventListener("abort", abortAlgorithm); } let currentWrite = Promise.resolve(); // At this point, the spec becomes non-specific and vague. Most of the rest // of this code is based on the reference implementation that is part of the // specification. This is why the functions are only scoped to this function // to ensure they don't leak into the spec compliant parts. function isOrBecomesClosed( stream: ReadableStreamImpl | WritableStreamImpl, promise: Promise, action: () => void ): void { if (stream[sym.state] === "closed") { action(); } else { setPromiseIsHandledToTrue(promise.then(action)); } } function isOrBecomesErrored( stream: ReadableStreamImpl | WritableStreamImpl, promise: Promise, action: (error: any) => void ): void { if (stream[sym.state] === "errored") { action(stream[sym.storedError]); } else { setPromiseIsHandledToTrue(promise.catch((error) => action(error))); } } function finalize(isError?: boolean, error?: any): void { writableStreamDefaultWriterRelease(writer); readableStreamReaderGenericRelease(reader); if (signal) { signal.removeEventListener("abort", abortAlgorithm); } if (isError) { promise.reject(error); } else { promise.resolve(); } } function waitForWritesToFinish(): Promise { const oldCurrentWrite = currentWrite; return currentWrite.then(() => oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined ); } function shutdownWithAction( action: () => Promise, originalIsError?: boolean, originalError?: any ): void { function doTheRest(): void { setPromiseIsHandledToTrue( action().then( () => finalize(originalIsError, originalError), (newError) => finalize(true, newError) ) ); } if (shuttingDown) { return; } shuttingDown = true; if ( dest[sym.state] === "writable" && writableStreamCloseQueuedOrInFlight(dest) === false ) { setPromiseIsHandledToTrue(waitForWritesToFinish().then(doTheRest)); } else { doTheRest(); } } function shutdown(isError: boolean, error?: any): void { if (shuttingDown) { return; } shuttingDown = true; if ( dest[sym.state] === "writable" && !writableStreamCloseQueuedOrInFlight(dest) ) { setPromiseIsHandledToTrue( waitForWritesToFinish().then(() => finalize(isError, error)) ); } finalize(isError, error); } function pipeStep(): Promise { if (shuttingDown) { return Promise.resolve(true); } return writer[sym.readyPromise].promise.then(() => { return readableStreamDefaultReaderRead(reader).then(({ value, done }) => { if (done === true) { return true; } currentWrite = writableStreamDefaultWriterWrite( writer, value! ).then(undefined, () => {}); return false; }); }); } function pipeLoop(): Promise { return new Promise((resolveLoop, rejectLoop) => { function next(done: boolean): void { if (done) { resolveLoop(undefined); } else { setPromiseIsHandledToTrue(pipeStep().then(next, rejectLoop)); } } next(false); }); } isOrBecomesErrored( source, reader[sym.closedPromise].promise, (storedError) => { if (!preventAbort) { shutdownWithAction( () => writableStreamAbort(dest, storedError), true, storedError ); } else { shutdown(true, storedError); } } ); isOrBecomesErrored(dest, writer[sym.closedPromise].promise, (storedError) => { if (!preventCancel) { shutdownWithAction( () => readableStreamCancel(source, storedError), true, storedError ); } else { shutdown(true, storedError); } }); isOrBecomesClosed(source, reader[sym.closedPromise].promise, () => { if (!preventClose) { shutdownWithAction(() => writableStreamDefaultWriterCloseWithErrorPropagation(writer) ); } }); if ( writableStreamCloseQueuedOrInFlight(dest) || dest[sym.state] === "closed" ) { const destClosed = new TypeError( "The destination writable stream closed before all data could be piped to it." ); if (!preventCancel) { shutdownWithAction( () => readableStreamCancel(source, destClosed), true, destClosed ); } else { shutdown(true, destClosed); } } setPromiseIsHandledToTrue(pipeLoop()); return promise.promise; } export function readableStreamReaderGenericCancel( reader: ReadableStreamGenericReader, reason: any ): Promise { const stream = reader[sym.ownerReadableStream]; assert(stream); return readableStreamCancel(stream, reason); } export function readableStreamReaderGenericInitialize( reader: ReadableStreamGenericReader, stream: ReadableStreamImpl ): void { reader[sym.forAuthorCode] = true; reader[sym.ownerReadableStream] = stream; stream[sym.reader] = reader; if (stream[sym.state] === "readable") { reader[sym.closedPromise] = getDeferred(); } else if (stream[sym.state] === "closed") { reader[sym.closedPromise] = { promise: Promise.resolve() }; } else { assert(stream[sym.state] === "errored"); reader[sym.closedPromise] = { promise: Promise.reject(stream[sym.storedError]), }; setPromiseIsHandledToTrue(reader[sym.closedPromise].promise); } } export function readableStreamReaderGenericRelease( reader: ReadableStreamGenericReader ): void { assert(reader[sym.ownerReadableStream]); assert(reader[sym.ownerReadableStream][sym.reader] === reader); const closedPromise = reader[sym.closedPromise]; if (reader[sym.ownerReadableStream][sym.state] === "readable") { assert(closedPromise.reject); closedPromise.reject(new TypeError("ReadableStream state is readable.")); } else { closedPromise.promise = Promise.reject(new TypeError("Reading is closed.")); delete closedPromise.reject; delete closedPromise.resolve; } setPromiseIsHandledToTrue(closedPromise.promise); reader[sym.ownerReadableStream][sym.reader] = undefined; (reader as any)[sym.ownerReadableStream] = undefined; } export function readableStreamTee( stream: ReadableStreamImpl, cloneForBranch2: boolean ): [ReadableStreamImpl, ReadableStreamImpl] { assert(isReadableStream(stream)); assert(typeof cloneForBranch2 === "boolean"); const reader = acquireReadableStreamDefaultReader(stream); let reading = false; let canceled1 = false; let canceled2 = false; let reason1: any = undefined; let reason2: any = undefined; /* eslint-disable prefer-const */ let branch1: ReadableStreamImpl; let branch2: ReadableStreamImpl; /* eslint-enable prefer-const */ const cancelPromise = getDeferred(); const pullAlgorithm = (): PromiseLike => { if (reading) { return Promise.resolve(); } reading = true; const readPromise = readableStreamDefaultReaderRead(reader).then( (result) => { reading = false; assert(typeof result === "object"); const { done } = result; assert(typeof done === "boolean"); if (done) { if (!canceled1) { readableStreamDefaultControllerClose( branch1[ sym.readableStreamController ] as ReadableStreamDefaultControllerImpl ); } if (!canceled2) { readableStreamDefaultControllerClose( branch2[ sym.readableStreamController ] as ReadableStreamDefaultControllerImpl ); } return; } const { value } = result; const value1 = value!; let value2 = value!; if (!canceled2 && cloneForBranch2) { value2 = cloneValue(value2); } if (!canceled1) { readableStreamDefaultControllerEnqueue( branch1[ sym.readableStreamController ] as ReadableStreamDefaultControllerImpl, value1 ); } if (!canceled2) { readableStreamDefaultControllerEnqueue( branch2[ sym.readableStreamController ] as ReadableStreamDefaultControllerImpl, value2 ); } } ); setPromiseIsHandledToTrue(readPromise); return Promise.resolve(); }; const cancel1Algorithm = (reason?: any): PromiseLike => { canceled1 = true; reason1 = reason; if (canceled2) { const compositeReason = [reason1, reason2]; const cancelResult = readableStreamCancel(stream, compositeReason); cancelPromise.resolve(cancelResult); } return cancelPromise.promise; }; const cancel2Algorithm = (reason?: any): PromiseLike => { canceled2 = true; reason2 = reason; if (canceled1) { const compositeReason = [reason1, reason2]; const cancelResult = readableStreamCancel(stream, compositeReason); cancelPromise.resolve(cancelResult); } return cancelPromise.promise; }; const startAlgorithm = (): void => undefined; branch1 = createReadableStream( startAlgorithm, pullAlgorithm, cancel1Algorithm ); branch2 = createReadableStream( startAlgorithm, pullAlgorithm, cancel2Algorithm ); setPromiseIsHandledToTrue( reader[sym.closedPromise].promise.catch((r) => { readableStreamDefaultControllerError( branch1[ sym.readableStreamController ] as ReadableStreamDefaultControllerImpl, r ); readableStreamDefaultControllerError( branch2[ sym.readableStreamController ] as ReadableStreamDefaultControllerImpl, r ); }) ); return [branch1, branch2]; } export function resetQueue(container: Container): void { assert(sym.queue in container && sym.queueTotalSize in container); container[sym.queue] = []; container[sym.queueTotalSize] = 0; } /** An internal function which mimics the behavior of setting the promise to * handled in JavaScript. In this situation, an assertion failure, which * shouldn't happen will get thrown, instead of swallowed. */ export function setPromiseIsHandledToTrue(promise: PromiseLike): void { promise.then(undefined, (e) => { if (e && e instanceof AssertionError) { queueMicrotask(() => { throw e; }); } }); } function setUpReadableByteStreamController( stream: ReadableStreamImpl, controller: ReadableByteStreamControllerImpl, startAlgorithm: StartAlgorithm, pullAlgorithm: PullAlgorithm, cancelAlgorithm: CancelAlgorithm, highWaterMark: number, autoAllocateChunkSize: number | undefined ): void { assert(stream[sym.readableStreamController] === undefined); if (autoAllocateChunkSize !== undefined) { assert(Number.isInteger(autoAllocateChunkSize)); assert(autoAllocateChunkSize >= 0); } controller[sym.controlledReadableByteStream] = stream; controller[sym.pulling] = controller[sym.pullAgain] = false; controller[sym.byobRequest] = undefined; controller[sym.queue] = []; controller[sym.queueTotalSize] = 0; controller[sym.closeRequested] = controller[sym.started] = false; controller[sym.strategyHWM] = validateAndNormalizeHighWaterMark( highWaterMark ); controller[sym.pullAlgorithm] = pullAlgorithm; controller[sym.cancelAlgorithm] = cancelAlgorithm; controller[sym.autoAllocateChunkSize] = autoAllocateChunkSize; // 3.13.26.12 Set controller.[[pendingPullIntos]] to a new empty List. stream[sym.readableStreamController] = controller; const startResult = startAlgorithm(); const startPromise = Promise.resolve(startResult); setPromiseIsHandledToTrue( startPromise.then( () => { controller[sym.started] = true; assert(!controller[sym.pulling]); assert(!controller[sym.pullAgain]); readableByteStreamControllerCallPullIfNeeded(controller); }, (r) => { readableByteStreamControllerError(controller, r); } ) ); } export function setUpReadableByteStreamControllerFromUnderlyingSource( stream: ReadableStreamImpl, underlyingByteSource: UnderlyingByteSource, highWaterMark: number ): void { assert(underlyingByteSource); const controller: ReadableByteStreamControllerImpl = Object.create( ReadableByteStreamControllerImpl.prototype ); const startAlgorithm: StartAlgorithm = () => { return invokeOrNoop(underlyingByteSource, "start", controller); }; const pullAlgorithm = createAlgorithmFromUnderlyingMethod( underlyingByteSource, "pull", 0, controller ); setFunctionName(pullAlgorithm, "[[pullAlgorithm]]"); const cancelAlgorithm = createAlgorithmFromUnderlyingMethod( underlyingByteSource, "cancel", 1 ); setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]"); // 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize"). const autoAllocateChunkSize = undefined; setUpReadableByteStreamController( stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize ); } function setUpReadableStreamDefaultController( stream: ReadableStreamImpl, controller: ReadableStreamDefaultControllerImpl, startAlgorithm: StartAlgorithm, pullAlgorithm: PullAlgorithm, cancelAlgorithm: CancelAlgorithm, highWaterMark: number, sizeAlgorithm: SizeAlgorithm ): void { assert(stream[sym.readableStreamController] === undefined); controller[sym.controlledReadableStream] = stream; controller[sym.queue] = []; controller[sym.queueTotalSize] = 0; controller[sym.started] = controller[sym.closeRequested] = controller[ sym.pullAgain ] = controller[sym.pulling] = false; controller[sym.strategySizeAlgorithm] = sizeAlgorithm; controller[sym.strategyHWM] = highWaterMark; controller[sym.pullAlgorithm] = pullAlgorithm; controller[sym.cancelAlgorithm] = cancelAlgorithm; stream[sym.readableStreamController] = controller; const startResult = startAlgorithm(); const startPromise = Promise.resolve(startResult); setPromiseIsHandledToTrue( startPromise.then( () => { controller[sym.started] = true; assert(controller[sym.pulling] === false); assert(controller[sym.pullAgain] === false); readableStreamDefaultControllerCallPullIfNeeded(controller); }, (r) => { readableStreamDefaultControllerError(controller, r); } ) ); } export function setUpReadableStreamDefaultControllerFromUnderlyingSource( stream: ReadableStreamImpl, underlyingSource: UnderlyingSource, highWaterMark: number, sizeAlgorithm: SizeAlgorithm ): void { assert(underlyingSource); const controller: ReadableStreamDefaultControllerImpl = Object.create( ReadableStreamDefaultControllerImpl.prototype ); const startAlgorithm: StartAlgorithm = (): void | PromiseLike => invokeOrNoop(underlyingSource, "start", controller); const pullAlgorithm: PullAlgorithm = createAlgorithmFromUnderlyingMethod( underlyingSource, "pull", 0, controller ); setFunctionName(pullAlgorithm, "[[pullAlgorithm]]"); const cancelAlgorithm: CancelAlgorithm = createAlgorithmFromUnderlyingMethod( underlyingSource, "cancel", 1 ); setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]"); setUpReadableStreamDefaultController( stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm ); } function setUpTransformStreamDefaultController( stream: TransformStreamImpl, controller: TransformStreamDefaultControllerImpl, transformAlgorithm: TransformAlgorithm, flushAlgorithm: FlushAlgorithm ): void { assert(isTransformStream(stream)); assert(stream[sym.transformStreamController] === undefined); controller[sym.controlledTransformStream] = stream; stream[sym.transformStreamController] = controller; controller[sym.transformAlgorithm] = transformAlgorithm; controller[sym.flushAlgorithm] = flushAlgorithm; } export function setUpTransformStreamDefaultControllerFromTransformer( stream: TransformStreamImpl, transformer: Transformer ): void { assert(transformer); const controller = Object.create( TransformStreamDefaultControllerImpl.prototype ) as TransformStreamDefaultControllerImpl; let transformAlgorithm: TransformAlgorithm = (chunk) => { try { transformStreamDefaultControllerEnqueue( controller, // it defaults to no tranformation, so I is assumed to be O (chunk as unknown) as O ); } catch (e) { return Promise.reject(e); } return Promise.resolve(); }; const transformMethod = transformer.transform; if (transformMethod) { if (typeof transformMethod !== "function") { throw new TypeError("tranformer.transform must be callable."); } transformAlgorithm = async (chunk): Promise => call(transformMethod, transformer, [chunk, controller]); } const flushAlgorithm = createAlgorithmFromUnderlyingMethod( transformer, "flush", 0, controller ); setUpTransformStreamDefaultController( stream, controller, transformAlgorithm, flushAlgorithm ); } function setUpWritableStreamDefaultController( stream: WritableStreamImpl, controller: WritableStreamDefaultControllerImpl, startAlgorithm: StartAlgorithm, writeAlgorithm: WriteAlgorithm, closeAlgorithm: CloseAlgorithm, abortAlgorithm: AbortAlgorithm, highWaterMark: number, sizeAlgorithm: SizeAlgorithm ): void { assert(isWritableStream(stream)); assert(stream[sym.writableStreamController] === undefined); controller[sym.controlledWritableStream] = stream; stream[sym.writableStreamController] = controller; controller[sym.queue] = []; controller[sym.queueTotalSize] = 0; controller[sym.started] = false; controller[sym.strategySizeAlgorithm] = sizeAlgorithm; controller[sym.strategyHWM] = highWaterMark; controller[sym.writeAlgorithm] = writeAlgorithm; controller[sym.closeAlgorithm] = closeAlgorithm; controller[sym.abortAlgorithm] = abortAlgorithm; const backpressure = writableStreamDefaultControllerGetBackpressure( controller ); writableStreamUpdateBackpressure(stream, backpressure); const startResult = startAlgorithm(); const startPromise = Promise.resolve(startResult); setPromiseIsHandledToTrue( startPromise.then( () => { assert( stream[sym.state] === "writable" || stream[sym.state] === "erroring" ); controller[sym.started] = true; writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); }, (r) => { assert( stream[sym.state] === "writable" || stream[sym.state] === "erroring" ); controller[sym.started] = true; writableStreamDealWithRejection(stream, r); } ) ); } export function setUpWritableStreamDefaultControllerFromUnderlyingSink( stream: WritableStreamImpl, underlyingSink: UnderlyingSink, highWaterMark: number, sizeAlgorithm: SizeAlgorithm ): void { assert(underlyingSink); const controller = Object.create( WritableStreamDefaultControllerImpl.prototype ); const startAlgorithm = (): void | PromiseLike => { return invokeOrNoop(underlyingSink, "start", controller); }; const writeAlgorithm = createAlgorithmFromUnderlyingMethod( underlyingSink, "write", 1, controller ); setFunctionName(writeAlgorithm, "[[writeAlgorithm]]"); const closeAlgorithm = createAlgorithmFromUnderlyingMethod( underlyingSink, "close", 0 ); setFunctionName(closeAlgorithm, "[[closeAlgorithm]]"); const abortAlgorithm = createAlgorithmFromUnderlyingMethod( underlyingSink, "abort", 1 ); setFunctionName(abortAlgorithm, "[[abortAlgorithm]]"); setUpWritableStreamDefaultController( stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm ); } function transformStreamDefaultControllerClearAlgorithms( controller: TransformStreamDefaultControllerImpl ): void { (controller as any)[sym.transformAlgorithm] = undefined; (controller as any)[sym.flushAlgorithm] = undefined; } export function transformStreamDefaultControllerEnqueue( controller: TransformStreamDefaultControllerImpl, chunk: O ): void { const stream = controller[sym.controlledTransformStream]; const readableController = stream[sym.readable][ sym.readableStreamController ] as ReadableStreamDefaultControllerImpl; if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) { throw new TypeError( "TransformStream's readable controller cannot be closed or enqueued." ); } try { readableStreamDefaultControllerEnqueue(readableController, chunk); } catch (e) { transformStreamErrorWritableAndUnblockWrite(stream, e); throw stream[sym.readable][sym.storedError]; } const backpressure = readableStreamDefaultControllerHasBackpressure( readableController ); if (backpressure) { transformStreamSetBackpressure(stream, true); } } export function transformStreamDefaultControllerError( controller: TransformStreamDefaultControllerImpl, e: any ): void { transformStreamError(controller[sym.controlledTransformStream], e); } function transformStreamDefaultControllerPerformTransform( controller: TransformStreamDefaultControllerImpl, chunk: I ): Promise { const transformPromise = controller[sym.transformAlgorithm](chunk); return transformPromise.then(undefined, (r) => { transformStreamError(controller[sym.controlledTransformStream], r); throw r; }); } function transformStreamDefaultSinkAbortAlgorithm( stream: TransformStreamImpl, reason: any ): Promise { transformStreamError(stream, reason); return Promise.resolve(undefined); } function transformStreamDefaultSinkCloseAlgorithm( stream: TransformStreamImpl ): Promise { const readable = stream[sym.readable]; const controller = stream[sym.transformStreamController]; const flushPromise = controller[sym.flushAlgorithm](); transformStreamDefaultControllerClearAlgorithms(controller); return flushPromise.then( () => { if (readable[sym.state] === "errored") { throw readable[sym.storedError]; } const readableController = readable[ sym.readableStreamController ] as ReadableStreamDefaultControllerImpl; if ( readableStreamDefaultControllerCanCloseOrEnqueue(readableController) ) { readableStreamDefaultControllerClose(readableController); } }, (r) => { transformStreamError(stream, r); throw readable[sym.storedError]; } ); } function transformStreamDefaultSinkWriteAlgorithm( stream: TransformStreamImpl, chunk: I ): Promise { assert(stream[sym.writable][sym.state] === "writable"); const controller = stream[sym.transformStreamController]; if (stream[sym.backpressure]) { const backpressureChangePromise = stream[sym.backpressureChangePromise]; assert(backpressureChangePromise); return backpressureChangePromise.promise.then(() => { const writable = stream[sym.writable]; const state = writable[sym.state]; if (state === "erroring") { throw writable[sym.storedError]; } assert(state === "writable"); return transformStreamDefaultControllerPerformTransform( controller, chunk ); }); } return transformStreamDefaultControllerPerformTransform(controller, chunk); } function transformStreamDefaultSourcePullAlgorithm( stream: TransformStreamImpl ): Promise { assert(stream[sym.backpressure] === true); assert(stream[sym.backpressureChangePromise] !== undefined); transformStreamSetBackpressure(stream, false); return stream[sym.backpressureChangePromise]!.promise; } function transformStreamError( stream: TransformStreamImpl, e: any ): void { readableStreamDefaultControllerError( stream[sym.readable][ sym.readableStreamController ] as ReadableStreamDefaultControllerImpl, e ); transformStreamErrorWritableAndUnblockWrite(stream, e); } export function transformStreamDefaultControllerTerminate( controller: TransformStreamDefaultControllerImpl ): void { const stream = controller[sym.controlledTransformStream]; const readableController = stream[sym.readable][ sym.readableStreamController ] as ReadableStreamDefaultControllerImpl; readableStreamDefaultControllerClose(readableController); const error = new TypeError("TransformStream is closed."); transformStreamErrorWritableAndUnblockWrite(stream, error); } function transformStreamErrorWritableAndUnblockWrite( stream: TransformStreamImpl, e: any ): void { transformStreamDefaultControllerClearAlgorithms( stream[sym.transformStreamController] ); writableStreamDefaultControllerErrorIfNeeded( stream[sym.writable][sym.writableStreamController]!, e ); if (stream[sym.backpressure]) { transformStreamSetBackpressure(stream, false); } } function transformStreamSetBackpressure( stream: TransformStreamImpl, backpressure: boolean ): void { assert(stream[sym.backpressure] !== backpressure); if (stream[sym.backpressureChangePromise] !== undefined) { stream[sym.backpressureChangePromise]!.resolve!(undefined); } stream[sym.backpressureChangePromise] = getDeferred(); stream[sym.backpressure] = backpressure; } function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer { assert(!isDetachedBuffer(buffer)); const transferredIshVersion = buffer.slice(0); Object.defineProperty(buffer, "byteLength", { get(): number { return 0; }, }); (buffer as any)[sym.isFakeDetached] = true; return transferredIshVersion; } export function validateAndNormalizeHighWaterMark( highWaterMark: number ): number { highWaterMark = Number(highWaterMark); if (Number.isNaN(highWaterMark) || highWaterMark < 0) { throw new RangeError( `highWaterMark must be a positive number or Infinity. Received: ${highWaterMark}.` ); } return highWaterMark; } export function writableStreamAbort( stream: WritableStreamImpl, reason: any ): Promise { const state = stream[sym.state]; if (state === "closed" || state === "errored") { return Promise.resolve(undefined); } if (stream[sym.pendingAbortRequest]) { return stream[sym.pendingAbortRequest]!.promise.promise; } assert(state === "writable" || state === "erroring"); let wasAlreadyErroring = false; if (state === "erroring") { wasAlreadyErroring = true; reason = undefined; } const promise = getDeferred(); stream[sym.pendingAbortRequest] = { promise, reason, wasAlreadyErroring }; if (wasAlreadyErroring === false) { writableStreamStartErroring(stream, reason); } return promise.promise; } function writableStreamAddWriteRequest( stream: WritableStreamImpl ): Promise { assert(isWritableStream(stream)); assert(stream[sym.state] === "writable"); const promise = getDeferred(); stream[sym.writeRequests].push(promise); return promise.promise; } export function writableStreamClose( stream: WritableStreamImpl ): Promise { const state = stream[sym.state]; if (state === "closed" || state === "errored") { return Promise.reject( new TypeError("Cannot close an already closed or errored WritableStream.") ); } assert(!writableStreamCloseQueuedOrInFlight(stream)); const promise = getDeferred(); stream[sym.closeRequest] = promise; const writer = stream[sym.writer]; if (writer && stream[sym.backpressure] && state === "writable") { writer[sym.readyPromise].resolve!(); writer[sym.readyPromise].resolve = undefined; writer[sym.readyPromise].reject = undefined; } writableStreamDefaultControllerClose(stream[sym.writableStreamController]!); return promise.promise; } export function writableStreamCloseQueuedOrInFlight( stream: WritableStreamImpl ): boolean { return !( stream[sym.closeRequest] === undefined && stream[sym.inFlightCloseRequest] === undefined ); } function writableStreamDealWithRejection( stream: WritableStreamImpl, error: any ): void { const state = stream[sym.state]; if (state === "writable") { writableStreamStartErroring(stream, error); return; } assert(state === "erroring"); writableStreamFinishErroring(stream); } function writableStreamDefaultControllerAdvanceQueueIfNeeded( controller: WritableStreamDefaultControllerImpl ): void { const stream = controller[sym.controlledWritableStream]; if (!controller[sym.started]) { return; } if (stream[sym.inFlightWriteRequest]) { return; } const state = stream[sym.state]; assert(state !== "closed" && state !== "errored"); if (state === "erroring") { writableStreamFinishErroring(stream); return; } if (!controller[sym.queue].length) { return; } const writeRecord = peekQueueValue(controller); if (writeRecord === "close") { writableStreamDefaultControllerProcessClose(controller); } else { writableStreamDefaultControllerProcessWrite(controller, writeRecord.chunk); } } export function writableStreamDefaultControllerClearAlgorithms( controller: WritableStreamDefaultControllerImpl ): void { (controller as any)[sym.writeAlgorithm] = undefined; (controller as any)[sym.closeAlgorithm] = undefined; (controller as any)[sym.abortAlgorithm] = undefined; (controller as any)[sym.strategySizeAlgorithm] = undefined; } function writableStreamDefaultControllerClose( controller: WritableStreamDefaultControllerImpl ): void { enqueueValueWithSize(controller, "close", 0); writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); } export function writableStreamDefaultControllerError( controller: WritableStreamDefaultControllerImpl, error: any ): void { const stream = controller[sym.controlledWritableStream]; assert(stream[sym.state] === "writable"); writableStreamDefaultControllerClearAlgorithms(controller); writableStreamStartErroring(stream, error); } function writableStreamDefaultControllerErrorIfNeeded( controller: WritableStreamDefaultControllerImpl, error: any ): void { if (controller[sym.controlledWritableStream][sym.state] === "writable") { writableStreamDefaultControllerError(controller, error); } } function writableStreamDefaultControllerGetBackpressure( controller: WritableStreamDefaultControllerImpl ): boolean { const desiredSize = writableStreamDefaultControllerGetDesiredSize(controller); return desiredSize <= 0; } function writableStreamDefaultControllerGetChunkSize( controller: WritableStreamDefaultControllerImpl, chunk: W ): number { let returnValue: number; try { returnValue = controller[sym.strategySizeAlgorithm](chunk); } catch (e) { writableStreamDefaultControllerErrorIfNeeded(controller, e); return 1; } return returnValue; } function writableStreamDefaultControllerGetDesiredSize( controller: WritableStreamDefaultControllerImpl ): number { return controller[sym.strategyHWM] - controller[sym.queueTotalSize]; } function writableStreamDefaultControllerProcessClose( controller: WritableStreamDefaultControllerImpl ): void { const stream = controller[sym.controlledWritableStream]; writableStreamMarkCloseRequestInFlight(stream); dequeueValue(controller); assert(controller[sym.queue].length === 0); const sinkClosePromise = controller[sym.closeAlgorithm](); writableStreamDefaultControllerClearAlgorithms(controller); setPromiseIsHandledToTrue( sinkClosePromise.then( () => { writableStreamFinishInFlightClose(stream); }, (reason) => { writableStreamFinishInFlightCloseWithError(stream, reason); } ) ); } function writableStreamDefaultControllerProcessWrite( controller: WritableStreamDefaultControllerImpl, chunk: W ): void { const stream = controller[sym.controlledWritableStream]; writableStreamMarkFirstWriteRequestInFlight(stream); const sinkWritePromise = controller[sym.writeAlgorithm](chunk); setPromiseIsHandledToTrue( sinkWritePromise.then( () => { writableStreamFinishInFlightWrite(stream); const state = stream[sym.state]; assert(state === "writable" || state === "erroring"); dequeueValue(controller); if ( !writableStreamCloseQueuedOrInFlight(stream) && state === "writable" ) { const backpressure = writableStreamDefaultControllerGetBackpressure( controller ); writableStreamUpdateBackpressure(stream, backpressure); } writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); }, (reason) => { if (stream[sym.state] === "writable") { writableStreamDefaultControllerClearAlgorithms(controller); } writableStreamFinishInFlightWriteWithError(stream, reason); } ) ); } function writableStreamDefaultControllerWrite( controller: WritableStreamDefaultControllerImpl, chunk: W, chunkSize: number ): void { const writeRecord = { chunk }; try { enqueueValueWithSize(controller, writeRecord, chunkSize); } catch (e) { writableStreamDefaultControllerErrorIfNeeded(controller, e); return; } const stream = controller[sym.controlledWritableStream]; if ( !writableStreamCloseQueuedOrInFlight(stream) && stream[sym.state] === "writable" ) { const backpressure = writableStreamDefaultControllerGetBackpressure( controller ); writableStreamUpdateBackpressure(stream, backpressure); } writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); } export function writableStreamDefaultWriterAbort( writer: WritableStreamDefaultWriterImpl, reason: any ): Promise { const stream = writer[sym.ownerWritableStream]; assert(stream); return writableStreamAbort(stream, reason); } export function writableStreamDefaultWriterClose( writer: WritableStreamDefaultWriterImpl ): Promise { const stream = writer[sym.ownerWritableStream]; assert(stream); return writableStreamClose(stream); } function writableStreamDefaultWriterCloseWithErrorPropagation( writer: WritableStreamDefaultWriterImpl ): Promise { const stream = writer[sym.ownerWritableStream]; assert(stream); const state = stream[sym.state]; if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") { return Promise.resolve(); } if (state === "errored") { return Promise.reject(stream[sym.storedError]); } assert(state === "writable" || state === "erroring"); return writableStreamDefaultWriterClose(writer); } function writableStreamDefaultWriterEnsureClosePromiseRejected( writer: WritableStreamDefaultWriterImpl, error: any ): void { if (writer[sym.closedPromise].reject) { writer[sym.closedPromise].reject!(error); } else { writer[sym.closedPromise] = { promise: Promise.reject(error), }; } setPromiseIsHandledToTrue(writer[sym.closedPromise].promise); } function writableStreamDefaultWriterEnsureReadyPromiseRejected( writer: WritableStreamDefaultWriterImpl, error: any ): void { if (writer[sym.readyPromise].reject) { writer[sym.readyPromise].reject!(error); writer[sym.readyPromise].reject = undefined; writer[sym.readyPromise].resolve = undefined; } else { writer[sym.readyPromise] = { promise: Promise.reject(error), }; } setPromiseIsHandledToTrue(writer[sym.readyPromise].promise); } export function writableStreamDefaultWriterWrite( writer: WritableStreamDefaultWriterImpl, chunk: W ): Promise { const stream = writer[sym.ownerWritableStream]; assert(stream); const controller = stream[sym.writableStreamController]; assert(controller); const chunkSize = writableStreamDefaultControllerGetChunkSize( controller, chunk ); if (stream !== writer[sym.ownerWritableStream]) { return Promise.reject("Writer has incorrect WritableStream."); } const state = stream[sym.state]; if (state === "errored") { return Promise.reject(stream[sym.storedError]); } if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") { return Promise.reject(new TypeError("The stream is closed or closing.")); } if (state === "erroring") { return Promise.reject(stream[sym.storedError]); } assert(state === "writable"); const promise = writableStreamAddWriteRequest(stream); writableStreamDefaultControllerWrite(controller, chunk, chunkSize); return promise; } export function writableStreamDefaultWriterGetDesiredSize( writer: WritableStreamDefaultWriterImpl ): number | null { const stream = writer[sym.ownerWritableStream]; const state = stream[sym.state]; if (state === "errored" || state === "erroring") { return null; } if (state === "closed") { return 0; } return writableStreamDefaultControllerGetDesiredSize( stream[sym.writableStreamController]! ); } export function writableStreamDefaultWriterRelease( writer: WritableStreamDefaultWriterImpl ): void { const stream = writer[sym.ownerWritableStream]; assert(stream); assert(stream[sym.writer] === writer); const releasedError = new TypeError( "Writer was released and can no longer be used to monitor the stream's closedness." ); writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError); writableStreamDefaultWriterEnsureClosePromiseRejected(writer, releasedError); stream[sym.writer] = undefined; (writer as any)[sym.ownerWritableStream] = undefined; } function writableStreamFinishErroring(stream: WritableStreamImpl): void { assert(stream[sym.state] === "erroring"); assert(!writableStreamHasOperationMarkedInFlight(stream)); stream[sym.state] = "errored"; stream[sym.writableStreamController]![sym.errorSteps](); const storedError = stream[sym.storedError]; for (const writeRequest of stream[sym.writeRequests]) { assert(writeRequest.reject); writeRequest.reject(storedError); } stream[sym.writeRequests] = []; if (!stream[sym.pendingAbortRequest]) { writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); return; } const abortRequest = stream[sym.pendingAbortRequest]; assert(abortRequest); stream[sym.pendingAbortRequest] = undefined; if (abortRequest.wasAlreadyErroring) { assert(abortRequest.promise.reject); abortRequest.promise.reject(storedError); writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); return; } const promise = stream[sym.writableStreamController]![sym.abortSteps]( abortRequest.reason ); setPromiseIsHandledToTrue( promise.then( () => { assert(abortRequest.promise.resolve); abortRequest.promise.resolve(); writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); }, (reason) => { assert(abortRequest.promise.reject); abortRequest.promise.reject(reason); writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); } ) ); } function writableStreamFinishInFlightClose( stream: WritableStreamImpl ): void { assert(stream[sym.inFlightCloseRequest]); stream[sym.inFlightCloseRequest]?.resolve!(); stream[sym.inFlightCloseRequest] = undefined; const state = stream[sym.state]; assert(state === "writable" || state === "erroring"); if (state === "erroring") { stream[sym.storedError] = undefined; if (stream[sym.pendingAbortRequest]) { stream[sym.pendingAbortRequest]!.promise.resolve!(); stream[sym.pendingAbortRequest] = undefined; } } stream[sym.state] = "closed"; const writer = stream[sym.writer]; if (writer) { writer[sym.closedPromise].resolve!(); } assert(stream[sym.pendingAbortRequest] === undefined); assert(stream[sym.storedError] === undefined); } function writableStreamFinishInFlightCloseWithError( stream: WritableStreamImpl, error: any ): void { assert(stream[sym.inFlightCloseRequest]); stream[sym.inFlightCloseRequest]?.reject!(error); stream[sym.inFlightCloseRequest] = undefined; assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring"); if (stream[sym.pendingAbortRequest]) { stream[sym.pendingAbortRequest]?.promise.reject!(error); stream[sym.pendingAbortRequest] = undefined; } writableStreamDealWithRejection(stream, error); } function writableStreamFinishInFlightWrite( stream: WritableStreamImpl ): void { assert(stream[sym.inFlightWriteRequest]); stream[sym.inFlightWriteRequest]!.resolve(); stream[sym.inFlightWriteRequest] = undefined; } function writableStreamFinishInFlightWriteWithError( stream: WritableStreamImpl, error: any ): void { assert(stream[sym.inFlightWriteRequest]); stream[sym.inFlightWriteRequest]!.reject!(error); stream[sym.inFlightWriteRequest] = undefined; assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring"); writableStreamDealWithRejection(stream, error); } function writableStreamHasOperationMarkedInFlight( stream: WritableStreamImpl ): boolean { return !( stream[sym.inFlightWriteRequest] === undefined && stream[sym.inFlightCloseRequest] === undefined ); } function writableStreamMarkCloseRequestInFlight( stream: WritableStreamImpl ): void { assert(stream[sym.inFlightCloseRequest] === undefined); assert(stream[sym.closeRequest] !== undefined); stream[sym.inFlightCloseRequest] = stream[sym.closeRequest]; stream[sym.closeRequest] = undefined; } function writableStreamMarkFirstWriteRequestInFlight( stream: WritableStreamImpl ): void { assert(stream[sym.inFlightWriteRequest] === undefined); assert(stream[sym.writeRequests].length); const writeRequest = stream[sym.writeRequests].shift(); stream[sym.inFlightWriteRequest] = writeRequest; } function writableStreamRejectCloseAndClosedPromiseIfNeeded( stream: WritableStreamImpl ): void { assert(stream[sym.state] === "errored"); if (stream[sym.closeRequest]) { assert(stream[sym.inFlightCloseRequest] === undefined); stream[sym.closeRequest]!.reject!(stream[sym.storedError]); stream[sym.closeRequest] = undefined; } const writer = stream[sym.writer]; if (writer) { writer[sym.closedPromise].reject!(stream[sym.storedError]); setPromiseIsHandledToTrue(writer[sym.closedPromise].promise); } } function writableStreamStartErroring( stream: WritableStreamImpl, reason: any ): void { assert(stream[sym.storedError] === undefined); assert(stream[sym.state] === "writable"); const controller = stream[sym.writableStreamController]; assert(controller); stream[sym.state] = "erroring"; stream[sym.storedError] = reason; const writer = stream[sym.writer]; if (writer) { writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); } if ( !writableStreamHasOperationMarkedInFlight(stream) && controller[sym.started] ) { writableStreamFinishErroring(stream); } } function writableStreamUpdateBackpressure( stream: WritableStreamImpl, backpressure: boolean ): void { assert(stream[sym.state] === "writable"); assert(!writableStreamCloseQueuedOrInFlight(stream)); const writer = stream[sym.writer]; if (writer && backpressure !== stream[sym.backpressure]) { if (backpressure) { writer[sym.readyPromise] = getDeferred(); } else { assert(backpressure === false); writer[sym.readyPromise].resolve!(); writer[sym.readyPromise].resolve = undefined; writer[sym.readyPromise].reject = undefined; } } stream[sym.backpressure] = backpressure; } /* eslint-enable */