// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. import { acquireReadableStreamDefaultReader, initializeReadableStream, isReadableStream, isReadableStreamLocked, isUnderlyingByteSource, makeSizeAlgorithmFromSizeFunction, readableStreamCancel, ReadableStreamGenericReader, readableStreamTee, setUpReadableByteStreamControllerFromUnderlyingSource, setUpReadableStreamDefaultControllerFromUnderlyingSource, validateAndNormalizeHighWaterMark, } from "./internals.ts"; import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_controller.ts"; import { ReadableStreamAsyncIteratorPrototype } from "./readable_stream_async_iterator.ts"; import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts"; import * as sym from "./symbols.ts"; import { symbols } from "../../symbols.ts"; import { notImplemented } from "../../util.ts"; // eslint-disable-next-line @typescript-eslint/no-explicit-any export class ReadableStreamImpl implements ReadableStream { [sym.disturbed]: boolean; [sym.readableStreamController]: | ReadableStreamDefaultControllerImpl | ReadableByteStreamControllerImpl; [sym.reader]: ReadableStreamGenericReader | undefined; [sym.state]: "readable" | "closed" | "errored"; // eslint-disable-next-line @typescript-eslint/no-explicit-any [sym.storedError]: any; constructor( underlyingSource: UnderlyingByteSource | UnderlyingSource = {}, strategy: | { highWaterMark?: number; size?: undefined; } | QueuingStrategy = {} ) { initializeReadableStream(this); const { size } = strategy; let { highWaterMark } = strategy; const { type } = underlyingSource; if (isUnderlyingByteSource(underlyingSource)) { if (size !== undefined) { throw new RangeError( `When underlying source is "bytes", strategy.size must be undefined.` ); } highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 0); setUpReadableByteStreamControllerFromUnderlyingSource( this, underlyingSource, highWaterMark ); } else if (type === undefined) { const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size); highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 1); setUpReadableStreamDefaultControllerFromUnderlyingSource( this, underlyingSource, highWaterMark, sizeAlgorithm ); } else { throw new RangeError( `Valid values for underlyingSource are "bytes" or undefined. Received: "${type}".` ); } } get locked(): boolean { if (!isReadableStream(this)) { throw new TypeError("Invalid ReadableStream."); } return isReadableStreamLocked(this); } // eslint-disable-next-line @typescript-eslint/no-explicit-any cancel(reason?: any): Promise { if (!isReadableStream(this)) { return Promise.reject(new TypeError("Invalid ReadableStream.")); } if (isReadableStreamLocked(this)) { return Promise.reject( new TypeError("Cannot cancel a locked ReadableStream.") ); } return readableStreamCancel(this, reason); } getIterator({ preventCancel, }: { preventCancel?: boolean } = {}): AsyncIterableIterator { if (!isReadableStream(this)) { throw new TypeError("Invalid ReadableStream."); } const reader = acquireReadableStreamDefaultReader(this); const iterator = Object.create(ReadableStreamAsyncIteratorPrototype); iterator[sym.asyncIteratorReader] = reader; iterator[sym.preventCancel] = Boolean(preventCancel); return iterator; } getReader({ mode }: { mode?: string } = {}): ReadableStreamDefaultReader { if (!isReadableStream(this)) { throw new TypeError("Invalid ReadableStream."); } if (mode === undefined) { return acquireReadableStreamDefaultReader(this, true); } mode = String(mode); // 3.2.5.4.4 If mode is "byob", return ? AcquireReadableStreamBYOBReader(this, true). throw new RangeError(`Unsupported mode "${mode}"`); } pipeThrough(): // { // writable, // readable, // }: { // writable: WritableStream; // readable: ReadableStream; // }, // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}, ReadableStream { return notImplemented(); // if (!isReadableStream(this)) { // throw new TypeError("Invalid ReadableStream."); // } // if (!isWritableStream(writable)) { // throw new TypeError("writable is not a valid WritableStream."); // } // if (!isReadableStream(readable)) { // throw new TypeError("readable is not a valid ReadableStream."); // } // preventClose = Boolean(preventClose); // preventAbort = Boolean(preventAbort); // preventCancel = Boolean(preventCancel); // if (signal && !(signal instanceof AbortSignalImpl)) { // throw new TypeError("Invalid signal."); // } // if (isReadableStreamLocked(this)) { // throw new TypeError("ReadableStream is locked."); // } // if (isWritableStreamLocked(writable)) { // throw new TypeError("writable is locked."); // } // readableStreamPipeTo( // this, // writable, // preventClose, // preventAbort, // preventCancel, // signal, // ); // return readable; } pipeTo(): // dest: WritableStream, // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}, Promise { return notImplemented(); // if (!isReadableStream(this)) { // return Promise.reject(new TypeError("Invalid ReadableStream.")); // } // if (!isWritableStream(dest)) { // return Promise.reject( // new TypeError("dest is not a valid WritableStream."), // ); // } // preventClose = Boolean(preventClose); // preventAbort = Boolean(preventAbort); // preventCancel = Boolean(preventCancel); // if (signal && !(signal instanceof AbortSignalImpl)) { // return Promise.reject(new TypeError("Invalid signal.")); // } // if (isReadableStreamLocked(this)) { // return Promise.reject(new TypeError("ReadableStream is locked.")); // } // if (isWritableStreamLocked(this)) { // return Promise.reject(new TypeError("dest is locked.")); // } // return readableStreamPipeTo( // this, // dest, // preventClose, // preventAbort, // preventCancel, // signal, // ); } tee(): [ReadableStreamImpl, ReadableStreamImpl] { if (!isReadableStream(this)) { throw new TypeError("Invalid ReadableStream."); } return readableStreamTee(this, false); } [symbols.customInspect](): string { return `ReadableStream { locked: ${String(this.locked)} }`; } [Symbol.asyncIterator]( options: { preventCancel?: boolean; } = {} ): AsyncIterableIterator { return this.getIterator(options); } }