1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-01 16:51:13 -05:00
denoland-deno/cli/js/web/streams/readable-internals.ts

1351 lines
44 KiB
TypeScript
Raw Normal View History

// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/* eslint-disable @typescript-eslint/no-explicit-any */
// TODO reenable this lint here
import * as shared from "./shared-internals.ts";
import * as q from "./queue-mixin.ts";
import {
QueuingStrategy,
QueuingStrategySizeCallback,
UnderlyingSource,
UnderlyingByteSource,
} from "../dom_types.ts";
// ReadableStreamDefaultController
export const controlledReadableStream_ = Symbol("controlledReadableStream_");
export const pullAlgorithm_ = Symbol("pullAlgorithm_");
export const cancelAlgorithm_ = Symbol("cancelAlgorithm_");
export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_");
export const strategyHWM_ = Symbol("strategyHWM_");
export const started_ = Symbol("started_");
export const closeRequested_ = Symbol("closeRequested_");
export const pullAgain_ = Symbol("pullAgain_");
export const pulling_ = Symbol("pulling_");
export const cancelSteps_ = Symbol("cancelSteps_");
export const pullSteps_ = Symbol("pullSteps_");
// ReadableByteStreamController
export const autoAllocateChunkSize_ = Symbol("autoAllocateChunkSize_");
export const byobRequest_ = Symbol("byobRequest_");
export const controlledReadableByteStream_ = Symbol(
"controlledReadableByteStream_"
);
export const pendingPullIntos_ = Symbol("pendingPullIntos_");
// ReadableStreamDefaultReader
export const closedPromise_ = Symbol("closedPromise_");
export const ownerReadableStream_ = Symbol("ownerReadableStream_");
export const readRequests_ = Symbol("readRequests_");
export const readIntoRequests_ = Symbol("readIntoRequests_");
// ReadableStreamBYOBRequest
export const associatedReadableByteStreamController_ = Symbol(
"associatedReadableByteStreamController_"
);
export const view_ = Symbol("view_");
// ReadableStreamBYOBReader
// ReadableStream
export const reader_ = Symbol("reader_");
export const readableStreamController_ = Symbol("readableStreamController_");
export type StartFunction<OutputType> = (
controller: SDReadableStreamControllerBase<OutputType>
) => void | PromiseLike<void>;
export type StartAlgorithm = () => Promise<void> | void;
export type PullFunction<OutputType> = (
controller: SDReadableStreamControllerBase<OutputType>
) => void | PromiseLike<void>;
export type PullAlgorithm<OutputType> = (
controller: SDReadableStreamControllerBase<OutputType>
) => PromiseLike<void>;
export type CancelAlgorithm = (reason?: shared.ErrorResult) => Promise<void>;
// ----
export interface SDReadableStreamControllerBase<OutputType> {
readonly desiredSize: number | null;
close(): void;
error(e?: shared.ErrorResult): void;
[cancelSteps_](reason: shared.ErrorResult): Promise<void>;
[pullSteps_](forAuthorCode: boolean): Promise<IteratorResult<OutputType>>;
}
export interface SDReadableStreamBYOBRequest {
readonly view: ArrayBufferView;
respond(bytesWritten: number): void;
respondWithNewView(view: ArrayBufferView): void;
[associatedReadableByteStreamController_]:
| SDReadableByteStreamController
| undefined;
[view_]: ArrayBufferView | undefined;
}
interface ArrayBufferViewCtor {
new (
buffer: ArrayBufferLike,
byteOffset?: number,
byteLength?: number
): ArrayBufferView;
}
export interface PullIntoDescriptor {
readerType: "default" | "byob";
ctor: ArrayBufferViewCtor;
buffer: ArrayBufferLike;
byteOffset: number;
byteLength: number;
bytesFilled: number;
elementSize: number;
}
export interface SDReadableByteStreamController
extends SDReadableStreamControllerBase<ArrayBufferView>,
q.ByteQueueContainer {
readonly byobRequest: SDReadableStreamBYOBRequest | undefined;
enqueue(chunk: ArrayBufferView): void;
[autoAllocateChunkSize_]: number | undefined; // A positive integer, when the automatic buffer allocation feature is enabled. In that case, this value specifies the size of buffer to allocate. It is undefined otherwise.
[byobRequest_]: SDReadableStreamBYOBRequest | undefined; // A ReadableStreamBYOBRequest instance representing the current BYOB pull request
[cancelAlgorithm_]: CancelAlgorithm; // A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source
[closeRequested_]: boolean; // A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read
[controlledReadableByteStream_]: SDReadableStream<ArrayBufferView>; // The ReadableStream instance controlled
[pullAgain_]: boolean; // A boolean flag set to true if the streams mechanisms requested a call to the underlying byte sources pull() method to pull more data, but the pull could not yet be done since a previous call is still executing
[pullAlgorithm_]: PullAlgorithm<ArrayBufferView>; // A promise-returning algorithm that pulls data from the underlying source
[pulling_]: boolean; // A boolean flag set to true while the underlying byte sources pull() method is executing and has not yet fulfilled, used to prevent reentrant calls
[pendingPullIntos_]: PullIntoDescriptor[]; // A List of descriptors representing pending BYOB pull requests
[started_]: boolean; // A boolean flag indicating whether the underlying source has finished starting
[strategyHWM_]: number; // A number supplied to the constructor as part of the streams queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source
}
export interface SDReadableStreamDefaultController<OutputType>
extends SDReadableStreamControllerBase<OutputType>,
q.QueueContainer<OutputType> {
enqueue(chunk?: OutputType): void;
[controlledReadableStream_]: SDReadableStream<OutputType>;
[pullAlgorithm_]: PullAlgorithm<OutputType>;
[cancelAlgorithm_]: CancelAlgorithm;
[strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>;
[strategyHWM_]: number;
[started_]: boolean;
[closeRequested_]: boolean;
[pullAgain_]: boolean;
[pulling_]: boolean;
}
// ----
export interface SDReadableStreamReader<OutputType> {
readonly closed: Promise<void>;
cancel(reason: shared.ErrorResult): Promise<void>;
releaseLock(): void;
[ownerReadableStream_]: SDReadableStream<OutputType> | undefined;
[closedPromise_]: shared.ControlledPromise<void>;
}
export interface ReadRequest<V> extends shared.ControlledPromise<V> {
forAuthorCode: boolean;
}
export declare class SDReadableStreamDefaultReader<OutputType>
implements SDReadableStreamReader<OutputType> {
constructor(stream: SDReadableStream<OutputType>);
readonly closed: Promise<void>;
cancel(reason: shared.ErrorResult): Promise<void>;
releaseLock(): void;
read(): Promise<IteratorResult<OutputType | undefined>>;
[ownerReadableStream_]: SDReadableStream<OutputType> | undefined;
[closedPromise_]: shared.ControlledPromise<void>;
[readRequests_]: Array<ReadRequest<IteratorResult<OutputType>>>;
}
export declare class SDReadableStreamBYOBReader
implements SDReadableStreamReader<ArrayBufferView> {
constructor(stream: SDReadableStream<ArrayBufferView>);
readonly closed: Promise<void>;
cancel(reason: shared.ErrorResult): Promise<void>;
releaseLock(): void;
read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>>;
[ownerReadableStream_]: SDReadableStream<ArrayBufferView> | undefined;
[closedPromise_]: shared.ControlledPromise<void>;
[readIntoRequests_]: Array<ReadRequest<IteratorResult<ArrayBufferView>>>;
}
/* TODO reenable this when we add WritableStreams and Transforms
export interface GenericTransformStream<InputType, OutputType> {
readable: SDReadableStream<OutputType>;
writable: ws.WritableStream<InputType>;
}
*/
export type ReadableStreamState = "readable" | "closed" | "errored";
export declare class SDReadableStream<OutputType> {
constructor(
underlyingSource: UnderlyingByteSource,
strategy?: { highWaterMark?: number; size?: undefined }
);
constructor(
underlyingSource?: UnderlyingSource<OutputType>,
strategy?: QueuingStrategy<OutputType>
);
readonly locked: boolean;
cancel(reason?: shared.ErrorResult): Promise<void>;
getReader(): SDReadableStreamReader<OutputType>;
getReader(options: { mode: "byob" }): SDReadableStreamBYOBReader;
tee(): Array<SDReadableStream<OutputType>>;
/* TODO reenable these methods when we bring in writableStreams and transport types
pipeThrough<ResultType>(
transform: GenericTransformStream<OutputType, ResultType>,
options?: PipeOptions
): SDReadableStream<ResultType>;
pipeTo(
dest: ws.WritableStream<OutputType>,
options?: PipeOptions
): Promise<void>;
*/
[shared.state_]: ReadableStreamState;
[shared.storedError_]: shared.ErrorResult;
[reader_]: SDReadableStreamReader<OutputType> | undefined;
[readableStreamController_]: SDReadableStreamControllerBase<OutputType>;
}
// ---- Stream
export function initializeReadableStream<OutputType>(
stream: SDReadableStream<OutputType>
): void {
stream[shared.state_] = "readable";
stream[reader_] = undefined;
stream[shared.storedError_] = undefined;
stream[readableStreamController_] = undefined!; // mark slot as used for brand check
}
export function isReadableStream(
value: unknown
): value is SDReadableStream<any> {
if (typeof value !== "object" || value === null) {
return false;
}
return readableStreamController_ in value;
}
export function isReadableStreamLocked<OutputType>(
stream: SDReadableStream<OutputType>
): boolean {
return stream[reader_] !== undefined;
}
export function readableStreamGetNumReadIntoRequests<OutputType>(
stream: SDReadableStream<OutputType>
): number {
// TODO remove the "as unknown" cast
// This is in to workaround a compiler error
// error TS2352: Conversion of type 'SDReadableStreamReader<OutputType>' to type 'SDReadableStreamBYOBReader' may be a mistake because neither type sufficiently overlaps with the other. If this was intentional, convert the expression to 'unknown' first.
// Type 'SDReadableStreamReader<OutputType>' is missing the following properties from type 'SDReadableStreamBYOBReader': read, [readIntoRequests_]
const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader;
if (reader === undefined) {
return 0;
}
return reader[readIntoRequests_].length;
}
export function readableStreamGetNumReadRequests<OutputType>(
stream: SDReadableStream<OutputType>
): number {
const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
if (reader === undefined) {
return 0;
}
return reader[readRequests_].length;
}
export function readableStreamCreateReadResult<T>(
value: T,
done: boolean,
forAuthorCode: boolean
): IteratorResult<T> {
const prototype = forAuthorCode ? Object.prototype : null;
const result = Object.create(prototype);
result.value = value;
result.done = done;
return result;
}
export function readableStreamAddReadIntoRequest(
stream: SDReadableStream<ArrayBufferView>,
forAuthorCode: boolean
): Promise<IteratorResult<ArrayBufferView, any>> {
// Assert: ! IsReadableStreamBYOBReader(stream.[[reader]]) is true.
// Assert: stream.[[state]] is "readable" or "closed".
const reader = stream[reader_] as SDReadableStreamBYOBReader;
const conProm = shared.createControlledPromise<
IteratorResult<ArrayBufferView>
>() as ReadRequest<IteratorResult<ArrayBufferView>>;
conProm.forAuthorCode = forAuthorCode;
reader[readIntoRequests_].push(conProm);
return conProm.promise;
}
export function readableStreamAddReadRequest<OutputType>(
stream: SDReadableStream<OutputType>,
forAuthorCode: boolean
): Promise<IteratorResult<OutputType, any>> {
// Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true.
// Assert: stream.[[state]] is "readable".
const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
const conProm = shared.createControlledPromise<
IteratorResult<OutputType>
>() as ReadRequest<IteratorResult<OutputType>>;
conProm.forAuthorCode = forAuthorCode;
reader[readRequests_].push(conProm);
return conProm.promise;
}
export function readableStreamHasBYOBReader<OutputType>(
stream: SDReadableStream<OutputType>
): boolean {
const reader = stream[reader_];
return isReadableStreamBYOBReader(reader);
}
export function readableStreamHasDefaultReader<OutputType>(
stream: SDReadableStream<OutputType>
): boolean {
const reader = stream[reader_];
return isReadableStreamDefaultReader(reader);
}
export function readableStreamCancel<OutputType>(
stream: SDReadableStream<OutputType>,
reason: shared.ErrorResult
): Promise<undefined> {
if (stream[shared.state_] === "closed") {
return Promise.resolve(undefined);
}
if (stream[shared.state_] === "errored") {
return Promise.reject(stream[shared.storedError_]);
}
readableStreamClose(stream);
const sourceCancelPromise = stream[readableStreamController_][cancelSteps_](
reason
);
return sourceCancelPromise.then((_) => undefined);
}
export function readableStreamClose<OutputType>(
stream: SDReadableStream<OutputType>
): void {
// Assert: stream.[[state]] is "readable".
stream[shared.state_] = "closed";
const reader = stream[reader_];
if (reader === undefined) {
return;
}
if (isReadableStreamDefaultReader(reader)) {
for (const readRequest of reader[readRequests_]) {
readRequest.resolve(
readableStreamCreateReadResult(
undefined,
true,
readRequest.forAuthorCode
)
);
}
reader[readRequests_] = [];
}
reader[closedPromise_].resolve();
reader[closedPromise_].promise.catch(() => {});
}
export function readableStreamError<OutputType>(
stream: SDReadableStream<OutputType>,
error: shared.ErrorResult
): void {
if (stream[shared.state_] !== "readable") {
throw new RangeError("Stream is in an invalid state");
}
stream[shared.state_] = "errored";
stream[shared.storedError_] = error;
const reader = stream[reader_];
if (reader === undefined) {
return;
}
if (isReadableStreamDefaultReader(reader)) {
for (const readRequest of reader[readRequests_]) {
readRequest.reject(error);
}
reader[readRequests_] = [];
} else {
// Assert: IsReadableStreamBYOBReader(reader).
// TODO remove the "as unknown" cast
const readIntoRequests = ((reader as unknown) as SDReadableStreamBYOBReader)[
readIntoRequests_
];
for (const readIntoRequest of readIntoRequests) {
readIntoRequest.reject(error);
}
// TODO remove the "as unknown" cast
((reader as unknown) as SDReadableStreamBYOBReader)[readIntoRequests_] = [];
}
reader[closedPromise_].reject(error);
}
// ---- Readers
export function isReadableStreamDefaultReader(
reader: unknown
): reader is SDReadableStreamDefaultReader<any> {
if (typeof reader !== "object" || reader === null) {
return false;
}
return readRequests_ in reader;
}
export function isReadableStreamBYOBReader(
reader: unknown
): reader is SDReadableStreamBYOBReader {
if (typeof reader !== "object" || reader === null) {
return false;
}
return readIntoRequests_ in reader;
}
export function readableStreamReaderGenericInitialize<OutputType>(
reader: SDReadableStreamReader<OutputType>,
stream: SDReadableStream<OutputType>
): void {
reader[ownerReadableStream_] = stream;
stream[reader_] = reader;
const streamState = stream[shared.state_];
reader[closedPromise_] = shared.createControlledPromise<void>();
if (streamState === "readable") {
// leave as is
} else if (streamState === "closed") {
reader[closedPromise_].resolve(undefined);
} else {
reader[closedPromise_].reject(stream[shared.storedError_]);
reader[closedPromise_].promise.catch(() => {});
}
}
export function readableStreamReaderGenericRelease<OutputType>(
reader: SDReadableStreamReader<OutputType>
): void {
// Assert: reader.[[ownerReadableStream]] is not undefined.
// Assert: reader.[[ownerReadableStream]].[[reader]] is reader.
const stream = reader[ownerReadableStream_];
if (stream === undefined) {
throw new TypeError("Reader is in an inconsistent state");
}
if (stream[shared.state_] === "readable") {
// code moved out
} else {
reader[closedPromise_] = shared.createControlledPromise<void>();
}
reader[closedPromise_].reject(new TypeError());
reader[closedPromise_].promise.catch(() => {});
stream[reader_] = undefined;
reader[ownerReadableStream_] = undefined;
}
export function readableStreamBYOBReaderRead(
reader: SDReadableStreamBYOBReader,
view: ArrayBufferView,
forAuthorCode = false
): Promise<IteratorResult<ArrayBufferView, any>> {
const stream = reader[ownerReadableStream_]!;
// Assert: stream is not undefined.
if (stream[shared.state_] === "errored") {
return Promise.reject(stream[shared.storedError_]);
}
return readableByteStreamControllerPullInto(
stream[readableStreamController_] as SDReadableByteStreamController,
view,
forAuthorCode
);
}
export function readableStreamDefaultReaderRead<OutputType>(
reader: SDReadableStreamDefaultReader<OutputType>,
forAuthorCode = false
): Promise<IteratorResult<OutputType | undefined>> {
const stream = reader[ownerReadableStream_]!;
// Assert: stream is not undefined.
if (stream[shared.state_] === "closed") {
return Promise.resolve(
readableStreamCreateReadResult(undefined, true, forAuthorCode)
);
}
if (stream[shared.state_] === "errored") {
return Promise.reject(stream[shared.storedError_]);
}
// Assert: stream.[[state]] is "readable".
return stream[readableStreamController_][pullSteps_](forAuthorCode);
}
export function readableStreamFulfillReadIntoRequest<OutputType>(
stream: SDReadableStream<OutputType>,
chunk: ArrayBufferView,
done: boolean
): void {
// TODO remove the "as unknown" cast
const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader;
const readIntoRequest = reader[readIntoRequests_].shift()!; // <-- length check done in caller
readIntoRequest.resolve(
readableStreamCreateReadResult(chunk, done, readIntoRequest.forAuthorCode)
);
}
export function readableStreamFulfillReadRequest<OutputType>(
stream: SDReadableStream<OutputType>,
chunk: OutputType,
done: boolean
): void {
const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
const readRequest = reader[readRequests_].shift()!; // <-- length check done in caller
readRequest.resolve(
readableStreamCreateReadResult(chunk, done, readRequest.forAuthorCode)
);
}
// ---- DefaultController
export function setUpReadableStreamDefaultController<OutputType>(
stream: SDReadableStream<OutputType>,
controller: SDReadableStreamDefaultController<OutputType>,
startAlgorithm: StartAlgorithm,
pullAlgorithm: PullAlgorithm<OutputType>,
cancelAlgorithm: CancelAlgorithm,
highWaterMark: number,
sizeAlgorithm: QueuingStrategySizeCallback<OutputType>
): void {
// Assert: stream.[[readableStreamController]] is undefined.
controller[controlledReadableStream_] = stream;
q.resetQueue(controller);
controller[started_] = false;
controller[closeRequested_] = false;
controller[pullAgain_] = false;
controller[pulling_] = false;
controller[strategySizeAlgorithm_] = sizeAlgorithm;
controller[strategyHWM_] = highWaterMark;
controller[pullAlgorithm_] = pullAlgorithm;
controller[cancelAlgorithm_] = cancelAlgorithm;
stream[readableStreamController_] = controller;
const startResult = startAlgorithm();
Promise.resolve(startResult).then(
(_) => {
controller[started_] = true;
// Assert: controller.[[pulling]] is false.
// Assert: controller.[[pullAgain]] is false.
readableStreamDefaultControllerCallPullIfNeeded(controller);
},
(error) => {
readableStreamDefaultControllerError(controller, error);
}
);
}
export function isReadableStreamDefaultController(
value: unknown
): value is SDReadableStreamDefaultController<any> {
if (typeof value !== "object" || value === null) {
return false;
}
return controlledReadableStream_ in value;
}
export function readableStreamDefaultControllerHasBackpressure<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): boolean {
return !readableStreamDefaultControllerShouldCallPull(controller);
}
export function readableStreamDefaultControllerCanCloseOrEnqueue<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): boolean {
const state = controller[controlledReadableStream_][shared.state_];
return controller[closeRequested_] === false && state === "readable";
}
export function readableStreamDefaultControllerGetDesiredSize<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): number | null {
const state = controller[controlledReadableStream_][shared.state_];
if (state === "errored") {
return null;
}
if (state === "closed") {
return 0;
}
return controller[strategyHWM_] - controller[q.queueTotalSize_];
}
export function readableStreamDefaultControllerClose<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): void {
// Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.
controller[closeRequested_] = true;
const stream = controller[controlledReadableStream_];
if (controller[q.queue_].length === 0) {
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamClose(stream);
}
}
export function readableStreamDefaultControllerEnqueue<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>,
chunk: OutputType
): void {
const stream = controller[controlledReadableStream_];
// Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.
if (
isReadableStreamLocked(stream) &&
readableStreamGetNumReadRequests(stream) > 0
) {
readableStreamFulfillReadRequest(stream, chunk, false);
} else {
// Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk,
// and interpreting the result as an ECMAScript completion value.
// impl note: assuming that in JS land this just means try/catch with rethrow
let chunkSize: number;
try {
chunkSize = controller[strategySizeAlgorithm_](chunk);
} catch (error) {
readableStreamDefaultControllerError(controller, error);
throw error;
}
try {
q.enqueueValueWithSize(controller, chunk, chunkSize);
} catch (error) {
readableStreamDefaultControllerError(controller, error);
throw error;
}
}
readableStreamDefaultControllerCallPullIfNeeded(controller);
}
export function readableStreamDefaultControllerError<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>,
error: shared.ErrorResult
): void {
const stream = controller[controlledReadableStream_];
if (stream[shared.state_] !== "readable") {
return;
}
q.resetQueue(controller);
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamError(stream, error);
}
export function readableStreamDefaultControllerCallPullIfNeeded<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): void {
if (!readableStreamDefaultControllerShouldCallPull(controller)) {
return;
}
if (controller[pulling_]) {
controller[pullAgain_] = true;
return;
}
if (controller[pullAgain_]) {
throw new RangeError("Stream controller is in an invalid state.");
}
controller[pulling_] = true;
controller[pullAlgorithm_](controller).then(
(_) => {
controller[pulling_] = false;
if (controller[pullAgain_]) {
controller[pullAgain_] = false;
readableStreamDefaultControllerCallPullIfNeeded(controller);
}
},
(error) => {
readableStreamDefaultControllerError(controller, error);
}
);
}
export function readableStreamDefaultControllerShouldCallPull<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): boolean {
const stream = controller[controlledReadableStream_];
if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
return false;
}
if (controller[started_] === false) {
return false;
}
if (
isReadableStreamLocked(stream) &&
readableStreamGetNumReadRequests(stream) > 0
) {
return true;
}
const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);
if (desiredSize === null) {
throw new RangeError("Stream is in an invalid state.");
}
return desiredSize > 0;
}
export function readableStreamDefaultControllerClearAlgorithms<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): void {
controller[pullAlgorithm_] = undefined!;
controller[cancelAlgorithm_] = undefined!;
controller[strategySizeAlgorithm_] = undefined!;
}
// ---- BYOBController
export function setUpReadableByteStreamController(
stream: SDReadableStream<ArrayBufferView>,
controller: SDReadableByteStreamController,
startAlgorithm: StartAlgorithm,
pullAlgorithm: PullAlgorithm<ArrayBufferView>,
cancelAlgorithm: CancelAlgorithm,
highWaterMark: number,
autoAllocateChunkSize: number | undefined
): void {
// Assert: stream.[[readableStreamController]] is undefined.
if (stream[readableStreamController_] !== undefined) {
throw new TypeError("Cannot reuse streams");
}
if (autoAllocateChunkSize !== undefined) {
if (
!shared.isInteger(autoAllocateChunkSize) ||
autoAllocateChunkSize <= 0
) {
throw new RangeError(
"autoAllocateChunkSize must be a positive, finite integer"
);
}
}
// Set controller.[[controlledReadableByteStream]] to stream.
controller[controlledReadableByteStream_] = stream;
// Set controller.[[pullAgain]] and controller.[[pulling]] to false.
controller[pullAgain_] = false;
controller[pulling_] = false;
readableByteStreamControllerClearPendingPullIntos(controller);
q.resetQueue(controller);
controller[closeRequested_] = false;
controller[started_] = false;
controller[strategyHWM_] = shared.validateAndNormalizeHighWaterMark(
highWaterMark
);
controller[pullAlgorithm_] = pullAlgorithm;
controller[cancelAlgorithm_] = cancelAlgorithm;
controller[autoAllocateChunkSize_] = autoAllocateChunkSize;
controller[pendingPullIntos_] = [];
stream[readableStreamController_] = controller;
// Let startResult be the result of performing startAlgorithm.
const startResult = startAlgorithm();
Promise.resolve(startResult).then(
(_) => {
controller[started_] = true;
// Assert: controller.[[pulling]] is false.
// Assert: controller.[[pullAgain]] is false.
readableByteStreamControllerCallPullIfNeeded(controller);
},
(error) => {
readableByteStreamControllerError(controller, error);
}
);
}
export function isReadableStreamBYOBRequest(
value: unknown
): value is SDReadableStreamBYOBRequest {
if (typeof value !== "object" || value === null) {
return false;
}
return associatedReadableByteStreamController_ in value;
}
export function isReadableByteStreamController(
value: unknown
): value is SDReadableByteStreamController {
if (typeof value !== "object" || value === null) {
return false;
}
return controlledReadableByteStream_ in value;
}
export function readableByteStreamControllerCallPullIfNeeded(
controller: SDReadableByteStreamController
): void {
if (!readableByteStreamControllerShouldCallPull(controller)) {
return;
}
if (controller[pulling_]) {
controller[pullAgain_] = true;
return;
}
// Assert: controller.[[pullAgain]] is false.
controller[pulling_] = true;
controller[pullAlgorithm_](controller).then(
(_) => {
controller[pulling_] = false;
if (controller[pullAgain_]) {
controller[pullAgain_] = false;
readableByteStreamControllerCallPullIfNeeded(controller);
}
},
(error) => {
readableByteStreamControllerError(controller, error);
}
);
}
export function readableByteStreamControllerClearAlgorithms(
controller: SDReadableByteStreamController
): void {
controller[pullAlgorithm_] = undefined!;
controller[cancelAlgorithm_] = undefined!;
}
export function readableByteStreamControllerClearPendingPullIntos(
controller: SDReadableByteStreamController
): void {
readableByteStreamControllerInvalidateBYOBRequest(controller);
controller[pendingPullIntos_] = [];
}
export function readableByteStreamControllerClose(
controller: SDReadableByteStreamController
): void {
const stream = controller[controlledReadableByteStream_];
// Assert: controller.[[closeRequested]] is false.
// Assert: stream.[[state]] is "readable".
if (controller[q.queueTotalSize_] > 0) {
controller[closeRequested_] = true;
return;
}
if (controller[pendingPullIntos_].length > 0) {
const firstPendingPullInto = controller[pendingPullIntos_][0];
if (firstPendingPullInto.bytesFilled > 0) {
const error = new TypeError();
readableByteStreamControllerError(controller, error);
throw error;
}
}
readableByteStreamControllerClearAlgorithms(controller);
readableStreamClose(stream);
}
export function readableByteStreamControllerCommitPullIntoDescriptor(
stream: SDReadableStream<ArrayBufferView>,
pullIntoDescriptor: PullIntoDescriptor
): void {
// Assert: stream.[[state]] is not "errored".
let done = false;
if (stream[shared.state_] === "closed") {
// Assert: pullIntoDescriptor.[[bytesFilled]] is 0.
done = true;
}
const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
pullIntoDescriptor
);
if (pullIntoDescriptor.readerType === "default") {
readableStreamFulfillReadRequest(stream, filledView, done);
} else {
// Assert: pullIntoDescriptor.[[readerType]] is "byob".
readableStreamFulfillReadIntoRequest(stream, filledView, done);
}
}
export function readableByteStreamControllerConvertPullIntoDescriptor(
pullIntoDescriptor: PullIntoDescriptor
): ArrayBufferView {
const { bytesFilled, elementSize } = pullIntoDescriptor;
// Assert: bytesFilled <= pullIntoDescriptor.byteLength
// Assert: bytesFilled mod elementSize is 0
return new pullIntoDescriptor.ctor(
pullIntoDescriptor.buffer,
pullIntoDescriptor.byteOffset,
bytesFilled / elementSize
);
}
export function readableByteStreamControllerEnqueue(
controller: SDReadableByteStreamController,
chunk: ArrayBufferView
): void {
const stream = controller[controlledReadableByteStream_];
// Assert: controller.[[closeRequested]] is false.
// Assert: stream.[[state]] is "readable".
const { buffer, byteOffset, byteLength } = chunk;
const transferredBuffer = shared.transferArrayBuffer(buffer);
if (readableStreamHasDefaultReader(stream)) {
if (readableStreamGetNumReadRequests(stream) === 0) {
readableByteStreamControllerEnqueueChunkToQueue(
controller,
transferredBuffer,
byteOffset,
byteLength
);
} else {
// Assert: controller.[[queue]] is empty.
const transferredView = new Uint8Array(
transferredBuffer,
byteOffset,
byteLength
);
readableStreamFulfillReadRequest(stream, transferredView, false);
}
} else if (readableStreamHasBYOBReader(stream)) {
readableByteStreamControllerEnqueueChunkToQueue(
controller,
transferredBuffer,
byteOffset,
byteLength
);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
controller
);
} else {
// Assert: !IsReadableStreamLocked(stream) is false.
readableByteStreamControllerEnqueueChunkToQueue(
controller,
transferredBuffer,
byteOffset,
byteLength
);
}
readableByteStreamControllerCallPullIfNeeded(controller);
}
export function readableByteStreamControllerEnqueueChunkToQueue(
controller: SDReadableByteStreamController,
buffer: ArrayBufferLike,
byteOffset: number,
byteLength: number
): void {
controller[q.queue_].push({ buffer, byteOffset, byteLength });
controller[q.queueTotalSize_] += byteLength;
}
export function readableByteStreamControllerError(
controller: SDReadableByteStreamController,
error: shared.ErrorResult
): void {
const stream = controller[controlledReadableByteStream_];
if (stream[shared.state_] !== "readable") {
return;
}
readableByteStreamControllerClearPendingPullIntos(controller);
q.resetQueue(controller);
readableByteStreamControllerClearAlgorithms(controller);
readableStreamError(stream, error);
}
export function readableByteStreamControllerFillHeadPullIntoDescriptor(
controller: SDReadableByteStreamController,
size: number,
pullIntoDescriptor: PullIntoDescriptor
): void {
// Assert: either controller.[[pendingPullIntos]] is empty, or the first element of controller.[[pendingPullIntos]] is pullIntoDescriptor.
readableByteStreamControllerInvalidateBYOBRequest(controller);
pullIntoDescriptor.bytesFilled += size;
}
export function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
controller: SDReadableByteStreamController,
pullIntoDescriptor: PullIntoDescriptor
): boolean {
const elementSize = pullIntoDescriptor.elementSize;
const currentAlignedBytes =
pullIntoDescriptor.bytesFilled -
(pullIntoDescriptor.bytesFilled % elementSize);
const maxBytesToCopy = Math.min(
controller[q.queueTotalSize_],
pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled
);
const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
let totalBytesToCopyRemaining = maxBytesToCopy;
let ready = false;
if (maxAlignedBytes > currentAlignedBytes) {
totalBytesToCopyRemaining =
maxAlignedBytes - pullIntoDescriptor.bytesFilled;
ready = true;
}
const queue = controller[q.queue_];
while (totalBytesToCopyRemaining > 0) {
const headOfQueue = queue.front()!;
const bytesToCopy = Math.min(
totalBytesToCopyRemaining,
headOfQueue.byteLength
);
const destStart =
pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
shared.copyDataBlockBytes(
pullIntoDescriptor.buffer,
destStart,
headOfQueue.buffer,
headOfQueue.byteOffset,
bytesToCopy
);
if (headOfQueue.byteLength === bytesToCopy) {
queue.shift();
} else {
headOfQueue.byteOffset += bytesToCopy;
headOfQueue.byteLength -= bytesToCopy;
}
controller[q.queueTotalSize_] -= bytesToCopy;
readableByteStreamControllerFillHeadPullIntoDescriptor(
controller,
bytesToCopy,
pullIntoDescriptor
);
totalBytesToCopyRemaining -= bytesToCopy;
}
if (!ready) {
// Assert: controller[queueTotalSize_] === 0
// Assert: pullIntoDescriptor.bytesFilled > 0
// Assert: pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize
}
return ready;
}
export function readableByteStreamControllerGetDesiredSize(
controller: SDReadableByteStreamController
): number | null {
const stream = controller[controlledReadableByteStream_];
const state = stream[shared.state_];
if (state === "errored") {
return null;
}
if (state === "closed") {
return 0;
}
return controller[strategyHWM_] - controller[q.queueTotalSize_];
}
export function readableByteStreamControllerHandleQueueDrain(
controller: SDReadableByteStreamController
): void {
// Assert: controller.[[controlledReadableByteStream]].[[state]] is "readable".
if (controller[q.queueTotalSize_] === 0 && controller[closeRequested_]) {
readableByteStreamControllerClearAlgorithms(controller);
readableStreamClose(controller[controlledReadableByteStream_]);
} else {
readableByteStreamControllerCallPullIfNeeded(controller);
}
}
export function readableByteStreamControllerInvalidateBYOBRequest(
controller: SDReadableByteStreamController
): void {
const byobRequest = controller[byobRequest_];
if (byobRequest === undefined) {
return;
}
byobRequest[associatedReadableByteStreamController_] = undefined;
byobRequest[view_] = undefined;
controller[byobRequest_] = undefined;
}
export function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
controller: SDReadableByteStreamController
): void {
// Assert: controller.[[closeRequested]] is false.
const pendingPullIntos = controller[pendingPullIntos_];
while (pendingPullIntos.length > 0) {
if (controller[q.queueTotalSize_] === 0) {
return;
}
const pullIntoDescriptor = pendingPullIntos[0];
if (
readableByteStreamControllerFillPullIntoDescriptorFromQueue(
controller,
pullIntoDescriptor
)
) {
readableByteStreamControllerShiftPendingPullInto(controller);
readableByteStreamControllerCommitPullIntoDescriptor(
controller[controlledReadableByteStream_],
pullIntoDescriptor
);
}
}
}
export function readableByteStreamControllerPullInto(
controller: SDReadableByteStreamController,
view: ArrayBufferView,
forAuthorCode: boolean
): Promise<IteratorResult<ArrayBufferView, any>> {
const stream = controller[controlledReadableByteStream_];
const elementSize = (view as Uint8Array).BYTES_PER_ELEMENT || 1; // DataView exposes this in Webkit as 1, is not present in FF or Blink
const ctor = view.constructor as Uint8ArrayConstructor; // the typecast here is just for TS typing, it does not influence buffer creation
const byteOffset = view.byteOffset;
const byteLength = view.byteLength;
const buffer = shared.transferArrayBuffer(view.buffer);
const pullIntoDescriptor: PullIntoDescriptor = {
buffer,
byteOffset,
byteLength,
bytesFilled: 0,
elementSize,
ctor,
readerType: "byob",
};
if (controller[pendingPullIntos_].length > 0) {
controller[pendingPullIntos_].push(pullIntoDescriptor);
return readableStreamAddReadIntoRequest(stream, forAuthorCode);
}
if (stream[shared.state_] === "closed") {
const emptyView = new ctor(
pullIntoDescriptor.buffer,
pullIntoDescriptor.byteOffset,
0
);
return Promise.resolve(
readableStreamCreateReadResult(emptyView, true, forAuthorCode)
);
}
if (controller[q.queueTotalSize_] > 0) {
if (
readableByteStreamControllerFillPullIntoDescriptorFromQueue(
controller,
pullIntoDescriptor
)
) {
const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
pullIntoDescriptor
);
readableByteStreamControllerHandleQueueDrain(controller);
return Promise.resolve(
readableStreamCreateReadResult(filledView, false, forAuthorCode)
);
}
if (controller[closeRequested_]) {
const error = new TypeError();
readableByteStreamControllerError(controller, error);
return Promise.reject(error);
}
}
controller[pendingPullIntos_].push(pullIntoDescriptor);
const promise = readableStreamAddReadIntoRequest(stream, forAuthorCode);
readableByteStreamControllerCallPullIfNeeded(controller);
return promise;
}
export function readableByteStreamControllerRespond(
controller: SDReadableByteStreamController,
bytesWritten: number
): void {
bytesWritten = Number(bytesWritten);
if (!shared.isFiniteNonNegativeNumber(bytesWritten)) {
throw new RangeError("bytesWritten must be a finite, non-negative number");
}
// Assert: controller.[[pendingPullIntos]] is not empty.
readableByteStreamControllerRespondInternal(controller, bytesWritten);
}
export function readableByteStreamControllerRespondInClosedState(
controller: SDReadableByteStreamController,
firstDescriptor: PullIntoDescriptor
): void {
firstDescriptor.buffer = shared.transferArrayBuffer(firstDescriptor.buffer);
// Assert: firstDescriptor.[[bytesFilled]] is 0.
const stream = controller[controlledReadableByteStream_];
if (readableStreamHasBYOBReader(stream)) {
while (readableStreamGetNumReadIntoRequests(stream) > 0) {
const pullIntoDescriptor = readableByteStreamControllerShiftPendingPullInto(
controller
)!;
readableByteStreamControllerCommitPullIntoDescriptor(
stream,
pullIntoDescriptor
);
}
}
}
export function readableByteStreamControllerRespondInReadableState(
controller: SDReadableByteStreamController,
bytesWritten: number,
pullIntoDescriptor: PullIntoDescriptor
): void {
if (
pullIntoDescriptor.bytesFilled + bytesWritten >
pullIntoDescriptor.byteLength
) {
throw new RangeError();
}
readableByteStreamControllerFillHeadPullIntoDescriptor(
controller,
bytesWritten,
pullIntoDescriptor
);
if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
return;
}
readableByteStreamControllerShiftPendingPullInto(controller);
const remainderSize =
pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;
if (remainderSize > 0) {
const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
const remainder = shared.cloneArrayBuffer(
pullIntoDescriptor.buffer,
end - remainderSize,
remainderSize,
ArrayBuffer
);
readableByteStreamControllerEnqueueChunkToQueue(
controller,
remainder,
0,
remainder.byteLength
);
}
pullIntoDescriptor.buffer = shared.transferArrayBuffer(
pullIntoDescriptor.buffer
);
pullIntoDescriptor.bytesFilled =
pullIntoDescriptor.bytesFilled - remainderSize;
readableByteStreamControllerCommitPullIntoDescriptor(
controller[controlledReadableByteStream_],
pullIntoDescriptor
);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
}
export function readableByteStreamControllerRespondInternal(
controller: SDReadableByteStreamController,
bytesWritten: number
): void {
const firstDescriptor = controller[pendingPullIntos_][0];
const stream = controller[controlledReadableByteStream_];
if (stream[shared.state_] === "closed") {
if (bytesWritten !== 0) {
throw new TypeError();
}
readableByteStreamControllerRespondInClosedState(
controller,
firstDescriptor
);
} else {
// Assert: stream.[[state]] is "readable".
readableByteStreamControllerRespondInReadableState(
controller,
bytesWritten,
firstDescriptor
);
}
readableByteStreamControllerCallPullIfNeeded(controller);
}
export function readableByteStreamControllerRespondWithNewView(
controller: SDReadableByteStreamController,
view: ArrayBufferView
): void {
// Assert: controller.[[pendingPullIntos]] is not empty.
const firstDescriptor = controller[pendingPullIntos_][0];
if (
firstDescriptor.byteOffset + firstDescriptor.bytesFilled !==
view.byteOffset
) {
throw new RangeError();
}
if (firstDescriptor.byteLength !== view.byteLength) {
throw new RangeError();
}
firstDescriptor.buffer = view.buffer;
readableByteStreamControllerRespondInternal(controller, view.byteLength);
}
export function readableByteStreamControllerShiftPendingPullInto(
controller: SDReadableByteStreamController
): PullIntoDescriptor | undefined {
const descriptor = controller[pendingPullIntos_].shift();
readableByteStreamControllerInvalidateBYOBRequest(controller);
return descriptor;
}
export function readableByteStreamControllerShouldCallPull(
controller: SDReadableByteStreamController
): boolean {
// Let stream be controller.[[controlledReadableByteStream]].
const stream = controller[controlledReadableByteStream_];
if (stream[shared.state_] !== "readable") {
return false;
}
if (controller[closeRequested_]) {
return false;
}
if (!controller[started_]) {
return false;
}
if (
readableStreamHasDefaultReader(stream) &&
readableStreamGetNumReadRequests(stream) > 0
) {
return true;
}
if (
readableStreamHasBYOBReader(stream) &&
readableStreamGetNumReadIntoRequests(stream) > 0
) {
return true;
}
const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
// Assert: desiredSize is not null.
return desiredSize! > 0;
}
export function setUpReadableStreamBYOBRequest(
request: SDReadableStreamBYOBRequest,
controller: SDReadableByteStreamController,
view: ArrayBufferView
): void {
if (!isReadableByteStreamController(controller)) {
throw new TypeError();
}
if (!ArrayBuffer.isView(view)) {
throw new TypeError();
}
// Assert: !IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
request[associatedReadableByteStreamController_] = controller;
request[view_] = view;
}