1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-04 08:54:20 -05:00
denoland-deno/cli/js/web/streams/readable-stream-default-controller.ts
Kitson Kelly fc4819e1e0
refactor: Event and EventTarget implementations (#4707)
Refactors Event and EventTarget so that they better encapsulate their
non-public data as well as are more forward compatible with things like
DOM Nodes.

Moves `dom_types.ts` -> `dom_types.d.ts` which was always the intention,
it was a legacy of when we used to build the types from the code and the
limitations of the compiler.  There was a lot of cruft in `dom_types`
which shouldn't have been there, and mis-alignment to the DOM standards.
This generally has been eliminated, though we still have some minor
differences from the DOM (like the removal of some deprecated
methods/properties).

Adds `DOMException`.  Strictly it shouldn't inherit from `Error`, but
most browsers provide a stack trace when one is thrown, so the behaviour
in Deno actually better matches the browser.

`Event` still doesn't log to console like it does in the browser.  I
 wanted to get this raised and that could be an enhancement later on (it
 currently doesn't either).
2020-04-11 11:42:02 -04:00

135 lines
4 KiB
TypeScript

// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/* eslint-disable @typescript-eslint/no-explicit-any */
// TODO reenable this lint here
import * as rs from "./readable-internals.ts";
import * as shared from "./shared-internals.ts";
import * as q from "./queue-mixin.ts";
import { Queue } from "./queue.ts";
import {
QueuingStrategySizeCallback,
UnderlyingSource,
} from "../dom_types.d.ts";
export class ReadableStreamDefaultController<OutputType>
implements rs.SDReadableStreamDefaultController<OutputType> {
[rs.cancelAlgorithm_]: rs.CancelAlgorithm;
[rs.closeRequested_]: boolean;
[rs.controlledReadableStream_]: rs.SDReadableStream<OutputType>;
[rs.pullAgain_]: boolean;
[rs.pullAlgorithm_]: rs.PullAlgorithm<OutputType>;
[rs.pulling_]: boolean;
[rs.strategyHWM_]: number;
[rs.strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>;
[rs.started_]: boolean;
[q.queue_]: Queue<q.QueueElement<OutputType>>;
[q.queueTotalSize_]: number;
constructor() {
throw new TypeError();
}
get desiredSize(): number | null {
return rs.readableStreamDefaultControllerGetDesiredSize(this);
}
close(): void {
if (!rs.isReadableStreamDefaultController(this)) {
throw new TypeError();
}
if (!rs.readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
throw new TypeError(
"Cannot close, the stream is already closing or not readable"
);
}
rs.readableStreamDefaultControllerClose(this);
}
enqueue(chunk?: OutputType): void {
if (!rs.isReadableStreamDefaultController(this)) {
throw new TypeError();
}
if (!rs.readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
throw new TypeError(
"Cannot enqueue, the stream is closing or not readable"
);
}
rs.readableStreamDefaultControllerEnqueue(this, chunk!);
}
error(e?: shared.ErrorResult): void {
if (!rs.isReadableStreamDefaultController(this)) {
throw new TypeError();
}
rs.readableStreamDefaultControllerError(this, e);
}
[rs.cancelSteps_](reason: shared.ErrorResult): Promise<void> {
q.resetQueue(this);
const result = this[rs.cancelAlgorithm_](reason);
rs.readableStreamDefaultControllerClearAlgorithms(this);
return result;
}
[rs.pullSteps_](
forAuthorCode: boolean
): Promise<IteratorResult<OutputType, any>> {
const stream = this[rs.controlledReadableStream_];
if (this[q.queue_].length > 0) {
const chunk = q.dequeueValue(this);
if (this[rs.closeRequested_] && this[q.queue_].length === 0) {
rs.readableStreamDefaultControllerClearAlgorithms(this);
rs.readableStreamClose(stream);
} else {
rs.readableStreamDefaultControllerCallPullIfNeeded(this);
}
return Promise.resolve(
rs.readableStreamCreateReadResult(chunk, false, forAuthorCode)
);
}
const pendingPromise = rs.readableStreamAddReadRequest(
stream,
forAuthorCode
);
rs.readableStreamDefaultControllerCallPullIfNeeded(this);
return pendingPromise;
}
}
export function setUpReadableStreamDefaultControllerFromUnderlyingSource<
OutputType
>(
stream: rs.SDReadableStream<OutputType>,
underlyingSource: UnderlyingSource<OutputType>,
highWaterMark: number,
sizeAlgorithm: QueuingStrategySizeCallback<OutputType>
): void {
// Assert: underlyingSource is not undefined.
const controller = Object.create(ReadableStreamDefaultController.prototype);
const startAlgorithm = (): any => {
return shared.invokeOrNoop(underlyingSource, "start", [controller]);
};
const pullAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
underlyingSource,
"pull",
[controller]
);
const cancelAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
underlyingSource,
"cancel",
[]
);
rs.setUpReadableStreamDefaultController(
stream,
controller,
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
highWaterMark,
sizeAlgorithm
);
}