1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-22 23:34:47 -05:00

Re-enable basic stream support for fetch bodies (#3192)

* Add sd-streams from https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/

* change the interfaces in dom_types to match what sd-streams expects
This commit is contained in:
Nick Stott 2019-10-28 12:41:36 -04:00 committed by Ry Dahl
parent 967c236fa5
commit 65d9286203
27 changed files with 5062 additions and 19 deletions

View file

@ -3,6 +3,7 @@ import * as blob from "./blob.ts";
import * as encoding from "./text_encoding.ts"; import * as encoding from "./text_encoding.ts";
import * as headers from "./headers.ts"; import * as headers from "./headers.ts";
import * as domTypes from "./dom_types.ts"; import * as domTypes from "./dom_types.ts";
import { ReadableStream } from "./streams/mod.ts";
const { Headers } = headers; const { Headers } = headers;
@ -12,6 +13,13 @@ const { TextEncoder, TextDecoder } = encoding;
const Blob = blob.DenoBlob; const Blob = blob.DenoBlob;
const DenoBlob = blob.DenoBlob; const DenoBlob = blob.DenoBlob;
type ReadableStreamReader = domTypes.ReadableStreamReader;
interface ReadableStreamController {
enqueue(chunk: string | ArrayBuffer): void;
close(): void;
}
export type BodySource = export type BodySource =
| domTypes.Blob | domTypes.Blob
| domTypes.BufferSource | domTypes.BufferSource
@ -37,6 +45,8 @@ function validateBodyType(owner: Body, bodySource: BodySource): boolean {
return true; return true;
} else if (typeof bodySource === "string") { } else if (typeof bodySource === "string") {
return true; return true;
} else if (bodySource instanceof ReadableStream) {
return true;
} else if (bodySource instanceof FormData) { } else if (bodySource instanceof FormData) {
return true; return true;
} else if (!bodySource) { } else if (!bodySource) {
@ -47,6 +57,58 @@ function validateBodyType(owner: Body, bodySource: BodySource): boolean {
); );
} }
function concatenate(...arrays: Uint8Array[]): ArrayBuffer {
let totalLength = 0;
for (const arr of arrays) {
totalLength += arr.length;
}
const result = new Uint8Array(totalLength);
let offset = 0;
for (const arr of arrays) {
result.set(arr, offset);
offset += arr.length;
}
return result.buffer as ArrayBuffer;
}
function bufferFromStream(stream: ReadableStreamReader): Promise<ArrayBuffer> {
return new Promise(
(resolve, reject): void => {
const parts: Uint8Array[] = [];
const encoder = new TextEncoder();
// recurse
(function pump(): void {
stream
.read()
.then(
({ done, value }): void => {
if (done) {
return resolve(concatenate(...parts));
}
if (typeof value === "string") {
parts.push(encoder.encode(value));
} else if (value instanceof ArrayBuffer) {
parts.push(new Uint8Array(value));
} else if (!value) {
// noop for undefined
} else {
reject("unhandled type on stream read");
}
return pump();
}
)
.catch(
(err): void => {
reject(err);
}
);
})();
}
);
}
function getHeaderValueParams(value: string): Map<string, string> { function getHeaderValueParams(value: string): Map<string, string> {
const params = new Map(); const params = new Map();
// Forced to do so for some Map constructor param mismatch // Forced to do so for some Map constructor param mismatch
@ -81,8 +143,18 @@ export class Body implements domTypes.Body {
if (this._stream) { if (this._stream) {
return this._stream; return this._stream;
} }
if (this._bodySource instanceof ReadableStream) {
// @ts-ignore
this._stream = this._bodySource;
}
if (typeof this._bodySource === "string") { if (typeof this._bodySource === "string") {
throw Error("not implemented"); this._stream = new ReadableStream({
start(controller: ReadableStreamController): void {
controller.enqueue(this._bodySource);
controller.close();
}
});
} }
return this._stream; return this._stream;
} }
@ -259,6 +331,9 @@ export class Body implements domTypes.Body {
} else if (typeof this._bodySource === "string") { } else if (typeof this._bodySource === "string") {
const enc = new TextEncoder(); const enc = new TextEncoder();
return enc.encode(this._bodySource).buffer as ArrayBuffer; return enc.encode(this._bodySource).buffer as ArrayBuffer;
} else if (this._bodySource instanceof ReadableStream) {
// @ts-ignore
return bufferFromStream(this._bodySource.getReader());
} else if (this._bodySource instanceof FormData) { } else if (this._bodySource instanceof FormData) {
const enc = new TextEncoder(); const enc = new TextEncoder();
return enc.encode(this._bodySource.toString()).buffer as ArrayBuffer; return enc.encode(this._bodySource.toString()).buffer as ArrayBuffer;

View file

@ -248,7 +248,7 @@ export interface AddEventListenerOptions extends EventListenerOptions {
passive: boolean; passive: boolean;
} }
interface AbortSignal extends EventTarget { export interface AbortSignal extends EventTarget {
readonly aborted: boolean; readonly aborted: boolean;
onabort: ((this: AbortSignal, ev: ProgressEvent) => any) | null; onabort: ((this: AbortSignal, ev: ProgressEvent) => any) | null;
addEventListener<K extends keyof AbortSignalEventMap>( addEventListener<K extends keyof AbortSignalEventMap>(
@ -273,19 +273,6 @@ interface AbortSignal extends EventTarget {
): void; ): void;
} }
export interface ReadableStream {
readonly locked: boolean;
cancel(): Promise<void>;
getReader(): ReadableStreamReader;
tee(): [ReadableStream, ReadableStream];
}
export interface ReadableStreamReader {
cancel(): Promise<void>;
read(): Promise<any>;
releaseLock(): void;
}
export interface FormData extends DomIterable<string, FormDataEntryValue> { export interface FormData extends DomIterable<string, FormDataEntryValue> {
append(name: string, value: string | Blob, fileName?: string): void; append(name: string, value: string | Blob, fileName?: string): void;
delete(name: string): void; delete(name: string): void;
@ -343,6 +330,131 @@ export interface Body {
text(): Promise<string>; text(): Promise<string>;
} }
export interface ReadableStream {
readonly locked: boolean;
cancel(reason?: any): Promise<void>;
getReader(): ReadableStreamReader;
tee(): ReadableStream[];
}
export interface UnderlyingSource<R = any> {
cancel?: ReadableStreamErrorCallback;
pull?: ReadableStreamDefaultControllerCallback<R>;
start?: ReadableStreamDefaultControllerCallback<R>;
type?: undefined;
}
export interface UnderlyingByteSource {
autoAllocateChunkSize?: number;
cancel?: ReadableStreamErrorCallback;
pull?: ReadableByteStreamControllerCallback;
start?: ReadableByteStreamControllerCallback;
type: "bytes";
}
export interface ReadableStreamReader {
cancel(reason?: any): Promise<void>;
read(): Promise<any>;
releaseLock(): void;
}
export interface ReadableStreamErrorCallback {
(reason: any): void | PromiseLike<void>;
}
export interface ReadableByteStreamControllerCallback {
(controller: ReadableByteStreamController): void | PromiseLike<void>;
}
export interface ReadableStreamDefaultControllerCallback<R> {
(controller: ReadableStreamDefaultController<R>): void | PromiseLike<void>;
}
export interface ReadableStreamDefaultController<R = any> {
readonly desiredSize: number | null;
close(): void;
enqueue(chunk: R): void;
error(error?: any): void;
}
export interface ReadableByteStreamController {
readonly byobRequest: ReadableStreamBYOBRequest | undefined;
readonly desiredSize: number | null;
close(): void;
enqueue(chunk: ArrayBufferView): void;
error(error?: any): void;
}
export interface ReadableStreamBYOBRequest {
readonly view: ArrayBufferView;
respond(bytesWritten: number): void;
respondWithNewView(view: ArrayBufferView): void;
}
/* TODO reenable these interfaces. These are needed to enable WritableStreams in js/streams/
export interface WritableStream<W = any> {
readonly locked: boolean;
abort(reason?: any): Promise<void>;
getWriter(): WritableStreamDefaultWriter<W>;
}
TODO reenable these interfaces. These are needed to enable WritableStreams in js/streams/
export interface UnderlyingSink<W = any> {
abort?: WritableStreamErrorCallback;
close?: WritableStreamDefaultControllerCloseCallback;
start?: WritableStreamDefaultControllerStartCallback;
type?: undefined;
write?: WritableStreamDefaultControllerWriteCallback<W>;
}
export interface PipeOptions {
preventAbort?: boolean;
preventCancel?: boolean;
preventClose?: boolean;
signal?: AbortSignal;
}
export interface WritableStreamDefaultWriter<W = any> {
readonly closed: Promise<void>;
readonly desiredSize: number | null;
readonly ready: Promise<void>;
abort(reason?: any): Promise<void>;
close(): Promise<void>;
releaseLock(): void;
write(chunk: W): Promise<void>;
}
export interface WritableStreamErrorCallback {
(reason: any): void | PromiseLike<void>;
}
export interface WritableStreamDefaultControllerCloseCallback {
(): void | PromiseLike<void>;
}
export interface WritableStreamDefaultControllerStartCallback {
(controller: WritableStreamDefaultController): void | PromiseLike<void>;
}
export interface WritableStreamDefaultControllerWriteCallback<W> {
(chunk: W, controller: WritableStreamDefaultController): void | PromiseLike<
void
>;
}
export interface WritableStreamDefaultController {
error(error?: any): void;
}
*/
export interface QueuingStrategy<T = any> {
highWaterMark?: number;
size?: QueuingStrategySizeCallback<T>;
}
export interface QueuingStrategySizeCallback<T = any> {
(chunk: T): number;
}
export interface Headers extends DomIterable<string, string> { export interface Headers extends DomIterable<string, string> {
/** Appends a new value onto an existing header inside a `Headers` object, or /** Appends a new value onto an existing header inside a `Headers` object, or
* adds the header if it does not already exist. * adds the header if it does not already exist.

View file

@ -76,5 +76,8 @@ export enum ErrorKind {
TooManyRedirects = 48, TooManyRedirects = 48,
Diagnostic = 49, Diagnostic = 49,
JSError = 50, JSError = 50,
TypeError = 51 TypeError = 51,
/** TODO this is a DomException type, and should be moved out of here when possible */
DataCloneError = 52
} }

View file

@ -26,7 +26,6 @@ import * as url from "./url.ts";
import * as urlSearchParams from "./url_search_params.ts"; import * as urlSearchParams from "./url_search_params.ts";
import * as workers from "./workers.ts"; import * as workers from "./workers.ts";
import * as performanceUtil from "./performance.ts"; import * as performanceUtil from "./performance.ts";
import * as request from "./request.ts"; import * as request from "./request.ts";
// These imports are not exposed and therefore are fine to just import the // These imports are not exposed and therefore are fine to just import the

View file

@ -2,8 +2,10 @@
import * as headers from "./headers.ts"; import * as headers from "./headers.ts";
import * as body from "./body.ts"; import * as body from "./body.ts";
import * as domTypes from "./dom_types.ts"; import * as domTypes from "./dom_types.ts";
import * as streams from "./streams/mod.ts";
const { Headers } = headers; const { Headers } = headers;
const { ReadableStream } = streams;
function byteUpperCase(s: string): string { function byteUpperCase(s: string): string {
return String(s).replace(/[a-z]/g, function byteUpperCaseReplace(c): string { return String(s).replace(/[a-z]/g, function byteUpperCaseReplace(c): string {
@ -138,7 +140,13 @@ export class Request extends body.Body implements domTypes.Request {
headersList.push(header); headersList.push(header);
} }
const body2 = this._bodySource; let body2 = this._bodySource;
if (this._bodySource instanceof ReadableStream) {
const tees = (this._bodySource as domTypes.ReadableStream).tee();
this._stream = this._bodySource = tees[0];
body2 = tees[1];
}
const cloned = new Request(this.url, { const cloned = new Request(this.url, {
body: body2, body: body2,

View file

@ -1,5 +1,5 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import { test, assertEquals } from "./test_util.ts"; import { test, assert, assertEquals } from "./test_util.ts";
test(function fromInit(): void { test(function fromInit(): void {
const req = new Request("https://example.com", { const req = new Request("https://example.com", {
@ -15,3 +15,35 @@ test(function fromInit(): void {
assertEquals(req.url, "https://example.com"); assertEquals(req.url, "https://example.com");
assertEquals(req.headers.get("test-header"), "value"); assertEquals(req.headers.get("test-header"), "value");
}); });
test(function fromRequest(): void {
const r = new Request("https://example.com");
// @ts-ignore
r._bodySource = "ahoyhoy";
r.headers.set("test-header", "value");
const req = new Request(r);
// @ts-ignore
assertEquals(req._bodySource, r._bodySource);
assertEquals(req.url, r.url);
assertEquals(req.headers.get("test-header"), r.headers.get("test-header"));
});
test(async function cloneRequestBodyStream(): Promise<void> {
// hack to get a stream
const stream = new Request("", { body: "a test body" }).body;
const r1 = new Request("https://example.com", {
body: stream
});
const r2 = r1.clone();
const b1 = await r1.text();
const b2 = await r2.text();
assertEquals(b1, b2);
// @ts-ignore
assert(r1._bodySource !== r2._bodySource);
});

20
cli/js/streams/mod.ts Normal file
View file

@ -0,0 +1,20 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/**
* @stardazed/streams - implementation of the web streams standard
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/
export { SDReadableStream as ReadableStream } from "./readable-stream.ts";
/* TODO The following are currently unused so not exported for clarity.
export { WritableStream } from "./writable-stream.ts";
export { TransformStream } from "./transform-stream.ts";
export {
ByteLengthQueuingStrategy,
CountQueuingStrategy
} from "./strategies.ts";
*/

237
cli/js/streams/pipe-to.ts Normal file
View file

@ -0,0 +1,237 @@
// TODO reenable this code when we enable writableStreams and transport types
// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
// /**
// * streams/pipe-to - pipeTo algorithm implementation
// * Part of Stardazed
// * (c) 2018-Present by Arthur Langereis - @zenmumbler
// * https://github.com/stardazed/sd-streams
// */
// /* eslint-disable @typescript-eslint/no-explicit-any */
// // TODO reenable this lint here
// import * as rs from "./readable-internals.ts";
// import * as ws from "./writable-internals.ts";
// import * as shared from "./shared-internals.ts";
// import { ReadableStreamDefaultReader } from "./readable-stream-default-reader.ts";
// import { WritableStreamDefaultWriter } from "./writable-stream-default-writer.ts";
// import { PipeOptions } from "../dom_types.ts";
// import { DenoError, ErrorKind } from "../errors.ts";
// // add a wrapper to handle falsy rejections
// interface ErrorWrapper {
// actualError: shared.ErrorResult;
// }
// export function pipeTo<ChunkType>(
// source: rs.SDReadableStream<ChunkType>,
// dest: ws.WritableStream<ChunkType>,
// options: PipeOptions
// ): Promise<void> {
// const preventClose = !!options.preventClose;
// const preventAbort = !!options.preventAbort;
// const preventCancel = !!options.preventCancel;
// const signal = options.signal;
// let shuttingDown = false;
// let latestWrite = Promise.resolve();
// const promise = shared.createControlledPromise<void>();
// // If IsReadableByteStreamController(this.[[readableStreamController]]) is true, let reader be either ! AcquireReadableStreamBYOBReader(this) or ! AcquireReadableStreamDefaultReader(this), at the user agents discretion.
// // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(this).
// const reader = new ReadableStreamDefaultReader(source);
// const writer = new WritableStreamDefaultWriter(dest);
// let abortAlgorithm: () => any;
// if (signal !== undefined) {
// abortAlgorithm = (): void => {
// // TODO this should be a DOMException,
// // https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/pipe-to.ts#L38
// const error = new DenoError(ErrorKind.AbortError, "Aborted");
// const actions: Array<() => Promise<void>> = [];
// if (preventAbort === false) {
// actions.push(() => {
// if (dest[shared.state_] === "writable") {
// return ws.writableStreamAbort(dest, error);
// }
// return Promise.resolve();
// });
// }
// if (preventCancel === false) {
// actions.push(() => {
// if (source[shared.state_] === "readable") {
// return rs.readableStreamCancel(source, error);
// }
// return Promise.resolve();
// });
// }
// shutDown(
// () => {
// return Promise.all(actions.map(a => a())).then(_ => undefined);
// },
// { actualError: error }
// );
// };
// if (signal.aborted === true) {
// abortAlgorithm();
// } else {
// signal.addEventListener("abort", abortAlgorithm);
// }
// }
// function onStreamErrored(
// stream: rs.SDReadableStream<ChunkType> | ws.WritableStream<ChunkType>,
// promise: Promise<void>,
// action: (error: shared.ErrorResult) => void
// ): void {
// if (stream[shared.state_] === "errored") {
// action(stream[shared.storedError_]);
// } else {
// promise.catch(action);
// }
// }
// function onStreamClosed(
// stream: rs.SDReadableStream<ChunkType> | ws.WritableStream<ChunkType>,
// promise: Promise<void>,
// action: () => void
// ): void {
// if (stream[shared.state_] === "closed") {
// action();
// } else {
// promise.then(action);
// }
// }
// onStreamErrored(source, reader[rs.closedPromise_].promise, error => {
// if (!preventAbort) {
// shutDown(() => ws.writableStreamAbort(dest, error), {
// actualError: error
// });
// } else {
// shutDown(undefined, { actualError: error });
// }
// });
// onStreamErrored(dest, writer[ws.closedPromise_].promise, error => {
// if (!preventCancel) {
// shutDown(() => rs.readableStreamCancel(source, error), {
// actualError: error
// });
// } else {
// shutDown(undefined, { actualError: error });
// }
// });
// onStreamClosed(source, reader[rs.closedPromise_].promise, () => {
// if (!preventClose) {
// shutDown(() =>
// ws.writableStreamDefaultWriterCloseWithErrorPropagation(writer)
// );
// } else {
// shutDown();
// }
// });
// if (
// ws.writableStreamCloseQueuedOrInFlight(dest) ||
// dest[shared.state_] === "closed"
// ) {
// // Assert: no chunks have been read or written.
// const destClosed = new TypeError();
// if (!preventCancel) {
// shutDown(() => rs.readableStreamCancel(source, destClosed), {
// actualError: destClosed
// });
// } else {
// shutDown(undefined, { actualError: destClosed });
// }
// }
// function awaitLatestWrite(): Promise<void> {
// const curLatestWrite = latestWrite;
// return latestWrite.then(() =>
// curLatestWrite === latestWrite ? undefined : awaitLatestWrite()
// );
// }
// function flushRemainder(): Promise<void> | undefined {
// if (
// dest[shared.state_] === "writable" &&
// !ws.writableStreamCloseQueuedOrInFlight(dest)
// ) {
// return awaitLatestWrite();
// } else {
// return undefined;
// }
// }
// function shutDown(action?: () => Promise<void>, error?: ErrorWrapper): void {
// if (shuttingDown) {
// return;
// }
// shuttingDown = true;
// if (action === undefined) {
// action = (): Promise<void> => Promise.resolve();
// }
// function finishShutDown(): void {
// action!().then(
// _ => finalize(error),
// newError => finalize({ actualError: newError })
// );
// }
// const flushWait = flushRemainder();
// if (flushWait) {
// flushWait.then(finishShutDown);
// } else {
// finishShutDown();
// }
// }
// function finalize(error?: ErrorWrapper): void {
// ws.writableStreamDefaultWriterRelease(writer);
// rs.readableStreamReaderGenericRelease(reader);
// if (signal && abortAlgorithm) {
// signal.removeEventListener("abort", abortAlgorithm);
// }
// if (error) {
// promise.reject(error.actualError);
// } else {
// promise.resolve(undefined);
// }
// }
// function next(): Promise<void> | undefined {
// if (shuttingDown) {
// return;
// }
// writer[ws.readyPromise_].promise.then(() => {
// rs.readableStreamDefaultReaderRead(reader).then(
// ({ value, done }) => {
// if (done) {
// return;
// }
// latestWrite = ws
// .writableStreamDefaultWriterWrite(writer, value!)
// .catch(() => {});
// next();
// },
// _error => {
// latestWrite = Promise.resolve();
// }
// );
// });
// }
// next();
// return promise.promise;
// }

View file

@ -0,0 +1,84 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/**
* streams/queue-mixin - internal queue operations for stream controllers
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/
/* eslint-disable @typescript-eslint/no-explicit-any */
// TODO reenable this lint here
import { Queue, QueueImpl } from "./queue.ts";
import { isFiniteNonNegativeNumber } from "./shared-internals.ts";
export const queue_ = Symbol("queue_");
export const queueTotalSize_ = Symbol("queueTotalSize_");
export interface QueueElement<V> {
value: V;
size: number;
}
export interface QueueContainer<V> {
[queue_]: Queue<QueueElement<V>>;
[queueTotalSize_]: number;
}
export interface ByteQueueContainer {
[queue_]: Queue<{
buffer: ArrayBufferLike;
byteOffset: number;
byteLength: number;
}>;
[queueTotalSize_]: number;
}
export function dequeueValue<V>(container: QueueContainer<V>): V {
// Assert: container has[[queue]] and[[queueTotalSize]] internal slots.
// Assert: container.[[queue]] is not empty.
const pair = container[queue_].shift()!;
const newTotalSize = container[queueTotalSize_] - pair.size;
container[queueTotalSize_] = Math.max(0, newTotalSize); // < 0 can occur due to rounding errors.
return pair.value;
}
export function enqueueValueWithSize<V>(
container: QueueContainer<V>,
value: V,
size: number
): void {
// Assert: container has[[queue]] and[[queueTotalSize]] internal slots.
if (!isFiniteNonNegativeNumber(size)) {
throw new RangeError("Chunk size must be a non-negative, finite numbers");
}
container[queue_].push({ value, size });
container[queueTotalSize_] += size;
}
export function peekQueueValue<V>(container: QueueContainer<V>): V {
// Assert: container has[[queue]] and[[queueTotalSize]] internal slots.
// Assert: container.[[queue]] is not empty.
return container[queue_].front()!.value;
}
export function resetQueue<V>(
container: ByteQueueContainer | QueueContainer<V>
): void {
// Chrome (as of v67) has a steep performance cliff with large arrays
// and shift(), around about 50k elements. While this is an unusual case
// we use a simple wrapper around shift and push that is chunked to
// avoid this pitfall.
// @see: https://github.com/stardazed/sd-streams/issues/1
container[queue_] = new QueueImpl<any>();
// The code below can be used as a plain array implementation of the
// Queue interface.
// const q = [] as any;
// q.front = function() { return this[0]; };
// container[queue_] = q;
container[queueTotalSize_] = 0;
}

65
cli/js/streams/queue.ts Normal file
View file

@ -0,0 +1,65 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/**
* streams/queue - simple queue type with chunked array backing
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/
const CHUNK_SIZE = 16384;
export interface Queue<T> {
push(t: T): void;
shift(): T | undefined;
front(): T | undefined;
readonly length: number;
}
export class QueueImpl<T> implements Queue<T> {
private readonly chunks_: T[][];
private readChunk_: T[];
private writeChunk_: T[];
private length_: number;
constructor() {
this.chunks_ = [[]];
this.readChunk_ = this.writeChunk_ = this.chunks_[0];
this.length_ = 0;
}
push(t: T): void {
this.writeChunk_.push(t);
this.length_ += 1;
if (this.writeChunk_.length === CHUNK_SIZE) {
this.writeChunk_ = [];
this.chunks_.push(this.writeChunk_);
}
}
front(): T | undefined {
if (this.length_ === 0) {
return undefined;
}
return this.readChunk_[0];
}
shift(): T | undefined {
if (this.length_ === 0) {
return undefined;
}
const t = this.readChunk_.shift();
this.length_ -= 1;
if (this.readChunk_.length === 0 && this.readChunk_ !== this.writeChunk_) {
this.chunks_.shift();
this.readChunk_ = this.chunks_[0];
}
return t;
}
get length(): number {
return this.length_;
}
}

View file

@ -0,0 +1,214 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/**
* streams/readable-byte-stream-controller - ReadableByteStreamController class implementation
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/
/* eslint-disable @typescript-eslint/no-explicit-any */
// TODO reenable this lint here
import * as rs from "./readable-internals.ts";
import * as q from "./queue-mixin.ts";
import * as shared from "./shared-internals.ts";
import { ReadableStreamBYOBRequest } from "./readable-stream-byob-request.ts";
import { Queue } from "./queue.ts";
import { UnderlyingByteSource } from "../dom_types.ts";
export class ReadableByteStreamController
implements rs.SDReadableByteStreamController {
[rs.autoAllocateChunkSize_]: number | undefined;
[rs.byobRequest_]: rs.SDReadableStreamBYOBRequest | undefined;
[rs.cancelAlgorithm_]: rs.CancelAlgorithm;
[rs.closeRequested_]: boolean;
[rs.controlledReadableByteStream_]: rs.SDReadableStream<ArrayBufferView>;
[rs.pullAgain_]: boolean;
[rs.pullAlgorithm_]: rs.PullAlgorithm<ArrayBufferView>;
[rs.pulling_]: boolean;
[rs.pendingPullIntos_]: rs.PullIntoDescriptor[];
[rs.started_]: boolean;
[rs.strategyHWM_]: number;
[q.queue_]: Queue<{
buffer: ArrayBufferLike;
byteOffset: number;
byteLength: number;
}>;
[q.queueTotalSize_]: number;
constructor() {
throw new TypeError();
}
get byobRequest(): rs.SDReadableStreamBYOBRequest | undefined {
if (!rs.isReadableByteStreamController(this)) {
throw new TypeError();
}
if (
this[rs.byobRequest_] === undefined &&
this[rs.pendingPullIntos_].length > 0
) {
const firstDescriptor = this[rs.pendingPullIntos_][0];
const view = new Uint8Array(
firstDescriptor.buffer,
firstDescriptor.byteOffset + firstDescriptor.bytesFilled,
firstDescriptor.byteLength - firstDescriptor.bytesFilled
);
const byobRequest = Object.create(
ReadableStreamBYOBRequest.prototype
) as ReadableStreamBYOBRequest;
rs.setUpReadableStreamBYOBRequest(byobRequest, this, view);
this[rs.byobRequest_] = byobRequest;
}
return this[rs.byobRequest_];
}
get desiredSize(): number | null {
if (!rs.isReadableByteStreamController(this)) {
throw new TypeError();
}
return rs.readableByteStreamControllerGetDesiredSize(this);
}
close(): void {
if (!rs.isReadableByteStreamController(this)) {
throw new TypeError();
}
if (this[rs.closeRequested_]) {
throw new TypeError("Stream is already closing");
}
if (this[rs.controlledReadableByteStream_][shared.state_] !== "readable") {
throw new TypeError("Stream is closed or errored");
}
rs.readableByteStreamControllerClose(this);
}
enqueue(chunk: ArrayBufferView): void {
if (!rs.isReadableByteStreamController(this)) {
throw new TypeError();
}
if (this[rs.closeRequested_]) {
throw new TypeError("Stream is already closing");
}
if (this[rs.controlledReadableByteStream_][shared.state_] !== "readable") {
throw new TypeError("Stream is closed or errored");
}
if (!ArrayBuffer.isView(chunk)) {
throw new TypeError("chunk must be a valid ArrayBufferView");
}
// If ! IsDetachedBuffer(chunk.[[ViewedArrayBuffer]]) is true, throw a TypeError exception.
return rs.readableByteStreamControllerEnqueue(this, chunk);
}
error(error?: shared.ErrorResult): void {
if (!rs.isReadableByteStreamController(this)) {
throw new TypeError();
}
rs.readableByteStreamControllerError(this, error);
}
[rs.cancelSteps_](reason: shared.ErrorResult): Promise<void> {
if (this[rs.pendingPullIntos_].length > 0) {
const firstDescriptor = this[rs.pendingPullIntos_][0];
firstDescriptor.bytesFilled = 0;
}
q.resetQueue(this);
const result = this[rs.cancelAlgorithm_](reason);
rs.readableByteStreamControllerClearAlgorithms(this);
return result;
}
[rs.pullSteps_](
forAuthorCode: boolean
): Promise<IteratorResult<ArrayBufferView, any>> {
const stream = this[rs.controlledReadableByteStream_];
// Assert: ! ReadableStreamHasDefaultReader(stream) is true.
if (this[q.queueTotalSize_] > 0) {
// Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
const entry = this[q.queue_].shift()!;
this[q.queueTotalSize_] -= entry.byteLength;
rs.readableByteStreamControllerHandleQueueDrain(this);
const view = new Uint8Array(
entry.buffer,
entry.byteOffset,
entry.byteLength
);
return Promise.resolve(
rs.readableStreamCreateReadResult(view, false, forAuthorCode)
);
}
const autoAllocateChunkSize = this[rs.autoAllocateChunkSize_];
if (autoAllocateChunkSize !== undefined) {
let buffer: ArrayBuffer;
try {
buffer = new ArrayBuffer(autoAllocateChunkSize);
} catch (error) {
return Promise.reject(error);
}
const pullIntoDescriptor: rs.PullIntoDescriptor = {
buffer,
byteOffset: 0,
byteLength: autoAllocateChunkSize,
bytesFilled: 0,
elementSize: 1,
ctor: Uint8Array,
readerType: "default"
};
this[rs.pendingPullIntos_].push(pullIntoDescriptor);
}
const promise = rs.readableStreamAddReadRequest(stream, forAuthorCode);
rs.readableByteStreamControllerCallPullIfNeeded(this);
return promise;
}
}
export function setUpReadableByteStreamControllerFromUnderlyingSource(
stream: rs.SDReadableStream<ArrayBufferView>,
underlyingByteSource: UnderlyingByteSource,
highWaterMark: number
): void {
// Assert: underlyingByteSource is not undefined.
const controller = Object.create(
ReadableByteStreamController.prototype
) as ReadableByteStreamController;
const startAlgorithm = (): any => {
return shared.invokeOrNoop(underlyingByteSource, "start", [controller]);
};
const pullAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
underlyingByteSource,
"pull",
[controller]
);
const cancelAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
underlyingByteSource,
"cancel",
[]
);
let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize;
if (autoAllocateChunkSize !== undefined) {
autoAllocateChunkSize = Number(autoAllocateChunkSize);
if (
!shared.isInteger(autoAllocateChunkSize) ||
autoAllocateChunkSize <= 0
) {
throw new RangeError(
"autoAllocateChunkSize must be a positive, finite integer"
);
}
}
rs.setUpReadableByteStreamController(
stream,
controller,
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
highWaterMark,
autoAllocateChunkSize
);
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,93 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/**
* streams/readable-stream-byob-reader - ReadableStreamBYOBReader class implementation
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/
import * as rs from "./readable-internals.ts";
import * as shared from "./shared-internals.ts";
export class SDReadableStreamBYOBReader
implements rs.SDReadableStreamBYOBReader {
[rs.closedPromise_]: shared.ControlledPromise<void>;
[rs.ownerReadableStream_]: rs.SDReadableStream<ArrayBufferView> | undefined;
[rs.readIntoRequests_]: Array<
rs.ReadRequest<IteratorResult<ArrayBufferView>>
>;
constructor(stream: rs.SDReadableStream<ArrayBufferView>) {
if (!rs.isReadableStream(stream)) {
throw new TypeError();
}
if (
!rs.isReadableByteStreamController(stream[rs.readableStreamController_])
) {
throw new TypeError();
}
if (rs.isReadableStreamLocked(stream)) {
throw new TypeError("The stream is locked.");
}
rs.readableStreamReaderGenericInitialize(this, stream);
this[rs.readIntoRequests_] = [];
}
get closed(): Promise<void> {
if (!rs.isReadableStreamBYOBReader(this)) {
return Promise.reject(new TypeError());
}
return this[rs.closedPromise_].promise;
}
cancel(reason: shared.ErrorResult): Promise<void> {
if (!rs.isReadableStreamBYOBReader(this)) {
return Promise.reject(new TypeError());
}
const stream = this[rs.ownerReadableStream_];
if (stream === undefined) {
return Promise.reject(
new TypeError("Reader is not associated with a stream")
);
}
return rs.readableStreamCancel(stream, reason);
}
read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>> {
if (!rs.isReadableStreamBYOBReader(this)) {
return Promise.reject(new TypeError());
}
if (this[rs.ownerReadableStream_] === undefined) {
return Promise.reject(
new TypeError("Reader is not associated with a stream")
);
}
if (!ArrayBuffer.isView(view)) {
return Promise.reject(
new TypeError("view argument must be a valid ArrayBufferView")
);
}
// If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true, return a promise rejected with a TypeError exception.
if (view.byteLength === 0) {
return Promise.reject(
new TypeError("supplied buffer view must be > 0 bytes")
);
}
return rs.readableStreamBYOBReaderRead(this, view, true);
}
releaseLock(): void {
if (!rs.isReadableStreamBYOBReader(this)) {
throw new TypeError();
}
if (this[rs.ownerReadableStream_] === undefined) {
throw new TypeError("Reader is not associated with a stream");
}
if (this[rs.readIntoRequests_].length > 0) {
throw new TypeError();
}
rs.readableStreamReaderGenericRelease(this);
}
}

View file

@ -0,0 +1,60 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/**
* streams/readable-stream-byob-request - ReadableStreamBYOBRequest class implementation
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/
import * as rs from "./readable-internals.ts";
export class ReadableStreamBYOBRequest {
[rs.associatedReadableByteStreamController_]:
| rs.SDReadableByteStreamController
| undefined;
[rs.view_]: ArrayBufferView | undefined;
constructor() {
throw new TypeError();
}
get view(): ArrayBufferView {
if (!rs.isReadableStreamBYOBRequest(this)) {
throw new TypeError();
}
return this[rs.view_]!;
}
respond(bytesWritten: number): void {
if (!rs.isReadableStreamBYOBRequest(this)) {
throw new TypeError();
}
if (this[rs.associatedReadableByteStreamController_] === undefined) {
throw new TypeError();
}
// If! IsDetachedBuffer(this.[[view]].[[ViewedArrayBuffer]]) is true, throw a TypeError exception.
return rs.readableByteStreamControllerRespond(
this[rs.associatedReadableByteStreamController_]!,
bytesWritten
);
}
respondWithNewView(view: ArrayBufferView): void {
if (!rs.isReadableStreamBYOBRequest(this)) {
throw new TypeError();
}
if (this[rs.associatedReadableByteStreamController_] === undefined) {
throw new TypeError();
}
if (!ArrayBuffer.isView(view)) {
throw new TypeError("view parameter must be a TypedArray");
}
// If! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true, throw a TypeError exception.
return rs.readableByteStreamControllerRespondWithNewView(
this[rs.associatedReadableByteStreamController_]!,
view
);
}
}

View file

@ -0,0 +1,139 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/**
* streams/readable-stream-default-controller - ReadableStreamDefaultController class implementation
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/
/* eslint-disable @typescript-eslint/no-explicit-any */
// TODO reenable this lint here
import * as rs from "./readable-internals.ts";
import * as shared from "./shared-internals.ts";
import * as q from "./queue-mixin.ts";
import { Queue } from "./queue.ts";
import { QueuingStrategySizeCallback, UnderlyingSource } from "../dom_types.ts";
export class ReadableStreamDefaultController<OutputType>
implements rs.SDReadableStreamDefaultController<OutputType> {
[rs.cancelAlgorithm_]: rs.CancelAlgorithm;
[rs.closeRequested_]: boolean;
[rs.controlledReadableStream_]: rs.SDReadableStream<OutputType>;
[rs.pullAgain_]: boolean;
[rs.pullAlgorithm_]: rs.PullAlgorithm<OutputType>;
[rs.pulling_]: boolean;
[rs.strategyHWM_]: number;
[rs.strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>;
[rs.started_]: boolean;
[q.queue_]: Queue<q.QueueElement<OutputType>>;
[q.queueTotalSize_]: number;
constructor() {
throw new TypeError();
}
get desiredSize(): number | null {
return rs.readableStreamDefaultControllerGetDesiredSize(this);
}
close(): void {
if (!rs.isReadableStreamDefaultController(this)) {
throw new TypeError();
}
if (!rs.readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
throw new TypeError(
"Cannot close, the stream is already closing or not readable"
);
}
rs.readableStreamDefaultControllerClose(this);
}
enqueue(chunk?: OutputType): void {
if (!rs.isReadableStreamDefaultController(this)) {
throw new TypeError();
}
if (!rs.readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
throw new TypeError(
"Cannot enqueue, the stream is closing or not readable"
);
}
rs.readableStreamDefaultControllerEnqueue(this, chunk!);
}
error(e?: shared.ErrorResult): void {
if (!rs.isReadableStreamDefaultController(this)) {
throw new TypeError();
}
rs.readableStreamDefaultControllerError(this, e);
}
[rs.cancelSteps_](reason: shared.ErrorResult): Promise<void> {
q.resetQueue(this);
const result = this[rs.cancelAlgorithm_](reason);
rs.readableStreamDefaultControllerClearAlgorithms(this);
return result;
}
[rs.pullSteps_](
forAuthorCode: boolean
): Promise<IteratorResult<OutputType, any>> {
const stream = this[rs.controlledReadableStream_];
if (this[q.queue_].length > 0) {
const chunk = q.dequeueValue(this);
if (this[rs.closeRequested_] && this[q.queue_].length === 0) {
rs.readableStreamDefaultControllerClearAlgorithms(this);
rs.readableStreamClose(stream);
} else {
rs.readableStreamDefaultControllerCallPullIfNeeded(this);
}
return Promise.resolve(
rs.readableStreamCreateReadResult(chunk, false, forAuthorCode)
);
}
const pendingPromise = rs.readableStreamAddReadRequest(
stream,
forAuthorCode
);
rs.readableStreamDefaultControllerCallPullIfNeeded(this);
return pendingPromise;
}
}
export function setUpReadableStreamDefaultControllerFromUnderlyingSource<
OutputType
>(
stream: rs.SDReadableStream<OutputType>,
underlyingSource: UnderlyingSource<OutputType>,
highWaterMark: number,
sizeAlgorithm: QueuingStrategySizeCallback<OutputType>
): void {
// Assert: underlyingSource is not undefined.
const controller = Object.create(ReadableStreamDefaultController.prototype);
const startAlgorithm = (): any => {
return shared.invokeOrNoop(underlyingSource, "start", [controller]);
};
const pullAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
underlyingSource,
"pull",
[controller]
);
const cancelAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
underlyingSource,
"cancel",
[]
);
rs.setUpReadableStreamDefaultController(
stream,
controller,
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
highWaterMark,
sizeAlgorithm
);
}

View file

@ -0,0 +1,75 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/**
* streams/readable-stream-default-reader - ReadableStreamDefaultReader class implementation
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/
import * as rs from "./readable-internals.ts";
import * as shared from "./shared-internals.ts";
export class ReadableStreamDefaultReader<OutputType>
implements rs.SDReadableStreamReader<OutputType> {
[rs.closedPromise_]: shared.ControlledPromise<void>;
[rs.ownerReadableStream_]: rs.SDReadableStream<OutputType> | undefined;
[rs.readRequests_]: Array<rs.ReadRequest<IteratorResult<OutputType>>>;
constructor(stream: rs.SDReadableStream<OutputType>) {
if (!rs.isReadableStream(stream)) {
throw new TypeError();
}
if (rs.isReadableStreamLocked(stream)) {
throw new TypeError("The stream is locked.");
}
rs.readableStreamReaderGenericInitialize(this, stream);
this[rs.readRequests_] = [];
}
get closed(): Promise<void> {
if (!rs.isReadableStreamDefaultReader(this)) {
return Promise.reject(new TypeError());
}
return this[rs.closedPromise_].promise;
}
cancel(reason: shared.ErrorResult): Promise<void> {
if (!rs.isReadableStreamDefaultReader(this)) {
return Promise.reject(new TypeError());
}
const stream = this[rs.ownerReadableStream_];
if (stream === undefined) {
return Promise.reject(
new TypeError("Reader is not associated with a stream")
);
}
return rs.readableStreamCancel(stream, reason);
}
read(): Promise<IteratorResult<OutputType | undefined>> {
if (!rs.isReadableStreamDefaultReader(this)) {
return Promise.reject(new TypeError());
}
if (this[rs.ownerReadableStream_] === undefined) {
return Promise.reject(
new TypeError("Reader is not associated with a stream")
);
}
return rs.readableStreamDefaultReaderRead(this, true);
}
releaseLock(): void {
if (!rs.isReadableStreamDefaultReader(this)) {
throw new TypeError();
}
if (this[rs.ownerReadableStream_] === undefined) {
return;
}
if (this[rs.readRequests_].length !== 0) {
throw new TypeError("Cannot release a stream with pending read requests");
}
rs.readableStreamReaderGenericRelease(this);
}
}

View file

@ -0,0 +1,387 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/**
* streams/readable-stream - ReadableStream class implementation
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/
/* eslint prefer-const: "off" */
// TODO remove this, surpressed because of
// 284:7 error 'branch1' is never reassigned. Use 'const' instead prefer-const
import * as rs from "./readable-internals.ts";
import * as shared from "./shared-internals.ts";
import {
QueuingStrategy,
QueuingStrategySizeCallback,
UnderlyingSource,
UnderlyingByteSource
} from "../dom_types.ts";
import {
ReadableStreamDefaultController,
setUpReadableStreamDefaultControllerFromUnderlyingSource
} from "./readable-stream-default-controller.ts";
import { ReadableStreamDefaultReader } from "./readable-stream-default-reader.ts";
import {
ReadableByteStreamController,
setUpReadableByteStreamControllerFromUnderlyingSource
} from "./readable-byte-stream-controller.ts";
import { SDReadableStreamBYOBReader } from "./readable-stream-byob-reader.ts";
export class SDReadableStream<OutputType>
implements rs.SDReadableStream<OutputType> {
[shared.state_]: rs.ReadableStreamState;
[shared.storedError_]: shared.ErrorResult;
[rs.reader_]: rs.SDReadableStreamReader<OutputType> | undefined;
[rs.readableStreamController_]: rs.SDReadableStreamControllerBase<OutputType>;
constructor(
underlyingSource: UnderlyingByteSource,
strategy?: { highWaterMark?: number; size?: undefined }
);
constructor(
underlyingSource?: UnderlyingSource<OutputType>,
strategy?: QueuingStrategy<OutputType>
);
constructor(
underlyingSource: UnderlyingSource<OutputType> | UnderlyingByteSource = {},
strategy:
| QueuingStrategy<OutputType>
| { highWaterMark?: number; size?: undefined } = {}
) {
rs.initializeReadableStream(this);
const sizeFunc = strategy.size;
const stratHWM = strategy.highWaterMark;
const sourceType = underlyingSource.type;
if (sourceType === undefined) {
const sizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(sizeFunc);
const highWaterMark = shared.validateAndNormalizeHighWaterMark(
stratHWM === undefined ? 1 : stratHWM
);
setUpReadableStreamDefaultControllerFromUnderlyingSource(
this,
underlyingSource as UnderlyingSource<OutputType>,
highWaterMark,
sizeAlgorithm
);
} else if (String(sourceType) === "bytes") {
if (sizeFunc !== undefined) {
throw new RangeError(
"bytes streams cannot have a strategy with a `size` field"
);
}
const highWaterMark = shared.validateAndNormalizeHighWaterMark(
stratHWM === undefined ? 0 : stratHWM
);
setUpReadableByteStreamControllerFromUnderlyingSource(
(this as unknown) as rs.SDReadableStream<ArrayBufferView>,
underlyingSource as UnderlyingByteSource,
highWaterMark
);
} else {
throw new RangeError(
"The underlying source's `type` field must be undefined or 'bytes'"
);
}
}
get locked(): boolean {
return rs.isReadableStreamLocked(this);
}
getReader(): rs.SDReadableStreamDefaultReader<OutputType>;
getReader(options: { mode?: "byob" }): rs.SDReadableStreamBYOBReader;
getReader(options?: {
mode?: "byob";
}):
| rs.SDReadableStreamDefaultReader<OutputType>
| rs.SDReadableStreamBYOBReader {
if (!rs.isReadableStream(this)) {
throw new TypeError();
}
if (options === undefined) {
options = {};
}
const { mode } = options;
if (mode === undefined) {
return new ReadableStreamDefaultReader(this);
} else if (String(mode) === "byob") {
return new SDReadableStreamBYOBReader(
(this as unknown) as rs.SDReadableStream<ArrayBufferView>
);
}
throw RangeError("mode option must be undefined or `byob`");
}
cancel(reason: shared.ErrorResult): Promise<void> {
if (!rs.isReadableStream(this)) {
return Promise.reject(new TypeError());
}
if (rs.isReadableStreamLocked(this)) {
return Promise.reject(new TypeError("Cannot cancel a locked stream"));
}
return rs.readableStreamCancel(this, reason);
}
tee(): Array<SDReadableStream<OutputType>> {
return readableStreamTee(this, false);
}
/* TODO reenable these methods when we bring in writableStreams and transport types
pipeThrough<ResultType>(
transform: rs.GenericTransformStream<OutputType, ResultType>,
options: PipeOptions = {}
): rs.SDReadableStream<ResultType> {
const { readable, writable } = transform;
if (!rs.isReadableStream(this)) {
throw new TypeError();
}
if (!ws.isWritableStream(writable)) {
throw new TypeError("writable must be a WritableStream");
}
if (!rs.isReadableStream(readable)) {
throw new TypeError("readable must be a ReadableStream");
}
if (options.signal !== undefined && !shared.isAbortSignal(options.signal)) {
throw new TypeError("options.signal must be an AbortSignal instance");
}
if (rs.isReadableStreamLocked(this)) {
throw new TypeError("Cannot pipeThrough on a locked stream");
}
if (ws.isWritableStreamLocked(writable)) {
throw new TypeError("Cannot pipeThrough to a locked stream");
}
const pipeResult = pipeTo(this, writable, options);
pipeResult.catch(() => {});
return readable;
}
pipeTo(
dest: ws.WritableStream<OutputType>,
options: PipeOptions = {}
): Promise<void> {
if (!rs.isReadableStream(this)) {
return Promise.reject(new TypeError());
}
if (!ws.isWritableStream(dest)) {
return Promise.reject(
new TypeError("destination must be a WritableStream")
);
}
if (options.signal !== undefined && !shared.isAbortSignal(options.signal)) {
return Promise.reject(
new TypeError("options.signal must be an AbortSignal instance")
);
}
if (rs.isReadableStreamLocked(this)) {
return Promise.reject(new TypeError("Cannot pipe from a locked stream"));
}
if (ws.isWritableStreamLocked(dest)) {
return Promise.reject(new TypeError("Cannot pipe to a locked stream"));
}
return pipeTo(this, dest, options);
}
*/
}
export function createReadableStream<OutputType>(
startAlgorithm: rs.StartAlgorithm,
pullAlgorithm: rs.PullAlgorithm<OutputType>,
cancelAlgorithm: rs.CancelAlgorithm,
highWaterMark?: number,
sizeAlgorithm?: QueuingStrategySizeCallback<OutputType>
): SDReadableStream<OutputType> {
if (highWaterMark === undefined) {
highWaterMark = 1;
}
if (sizeAlgorithm === undefined) {
sizeAlgorithm = (): number => 1;
}
// Assert: ! IsNonNegativeNumber(highWaterMark) is true.
const stream = Object.create(SDReadableStream.prototype) as SDReadableStream<
OutputType
>;
rs.initializeReadableStream(stream);
const controller = Object.create(
ReadableStreamDefaultController.prototype
) as ReadableStreamDefaultController<OutputType>;
rs.setUpReadableStreamDefaultController(
stream,
controller,
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
highWaterMark,
sizeAlgorithm
);
return stream;
}
export function createReadableByteStream<OutputType>(
startAlgorithm: rs.StartAlgorithm,
pullAlgorithm: rs.PullAlgorithm<OutputType>,
cancelAlgorithm: rs.CancelAlgorithm,
highWaterMark?: number,
autoAllocateChunkSize?: number
): SDReadableStream<OutputType> {
if (highWaterMark === undefined) {
highWaterMark = 0;
}
// Assert: ! IsNonNegativeNumber(highWaterMark) is true.
if (autoAllocateChunkSize !== undefined) {
if (
!shared.isInteger(autoAllocateChunkSize) ||
autoAllocateChunkSize <= 0
) {
throw new RangeError(
"autoAllocateChunkSize must be a positive, finite integer"
);
}
}
const stream = Object.create(SDReadableStream.prototype) as SDReadableStream<
OutputType
>;
rs.initializeReadableStream(stream);
const controller = Object.create(
ReadableByteStreamController.prototype
) as ReadableByteStreamController;
rs.setUpReadableByteStreamController(
(stream as unknown) as SDReadableStream<ArrayBufferView>,
controller,
startAlgorithm,
(pullAlgorithm as unknown) as rs.PullAlgorithm<ArrayBufferView>,
cancelAlgorithm,
highWaterMark,
autoAllocateChunkSize
);
return stream;
}
export function readableStreamTee<OutputType>(
stream: SDReadableStream<OutputType>,
cloneForBranch2: boolean
): [SDReadableStream<OutputType>, SDReadableStream<OutputType>] {
if (!rs.isReadableStream(stream)) {
throw new TypeError();
}
const reader = new ReadableStreamDefaultReader(stream);
let closedOrErrored = false;
let canceled1 = false;
let canceled2 = false;
let reason1: shared.ErrorResult;
let reason2: shared.ErrorResult;
let branch1: SDReadableStream<OutputType>;
let branch2: SDReadableStream<OutputType>;
let cancelResolve: (reason: shared.ErrorResult) => void;
const cancelPromise = new Promise<void>(resolve => (cancelResolve = resolve));
const pullAlgorithm = (): Promise<void> => {
return rs
.readableStreamDefaultReaderRead(reader)
.then(({ value, done }) => {
if (done && !closedOrErrored) {
if (!canceled1) {
rs.readableStreamDefaultControllerClose(branch1![
rs.readableStreamController_
] as ReadableStreamDefaultController<OutputType>);
}
if (!canceled2) {
rs.readableStreamDefaultControllerClose(branch2![
rs.readableStreamController_
] as ReadableStreamDefaultController<OutputType>);
}
closedOrErrored = true;
}
if (closedOrErrored) {
return;
}
const value1 = value;
let value2 = value;
if (!canceled1) {
rs.readableStreamDefaultControllerEnqueue(
branch1![
rs.readableStreamController_
] as ReadableStreamDefaultController<OutputType>,
value1!
);
}
if (!canceled2) {
if (cloneForBranch2) {
value2 = shared.cloneValue(value2);
}
rs.readableStreamDefaultControllerEnqueue(
branch2![
rs.readableStreamController_
] as ReadableStreamDefaultController<OutputType>,
value2!
);
}
});
};
const cancel1Algorithm = (reason: shared.ErrorResult): Promise<void> => {
canceled1 = true;
reason1 = reason;
if (canceled2) {
const cancelResult = rs.readableStreamCancel(stream, [reason1, reason2]);
cancelResolve(cancelResult);
}
return cancelPromise;
};
const cancel2Algorithm = (reason: shared.ErrorResult): Promise<void> => {
canceled2 = true;
reason2 = reason;
if (canceled1) {
const cancelResult = rs.readableStreamCancel(stream, [reason1, reason2]);
cancelResolve(cancelResult);
}
return cancelPromise;
};
const startAlgorithm = (): undefined => undefined;
branch1 = createReadableStream(
startAlgorithm,
pullAlgorithm,
cancel1Algorithm
);
branch2 = createReadableStream(
startAlgorithm,
pullAlgorithm,
cancel2Algorithm
);
reader[rs.closedPromise_].promise.catch(error => {
if (!closedOrErrored) {
rs.readableStreamDefaultControllerError(
branch1![
rs.readableStreamController_
] as ReadableStreamDefaultController<OutputType>,
error
);
rs.readableStreamDefaultControllerError(
branch2![
rs.readableStreamController_
] as ReadableStreamDefaultController<OutputType>,
error
);
closedOrErrored = true;
}
});
return [branch1, branch2];
}

View file

@ -0,0 +1,310 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/**
* streams/shared-internals - common types and methods for streams
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/
/* eslint-disable @typescript-eslint/no-explicit-any */
// TODO don't disable this warning
import { AbortSignal, QueuingStrategySizeCallback } from "../dom_types.ts";
import { DenoError, ErrorKind } from "../errors.ts";
// common stream fields
export const state_ = Symbol("state_");
export const storedError_ = Symbol("storedError_");
// ---------
/** An error reason / result can be anything */
export type ErrorResult = any;
// ---------
export function isInteger(value: number): boolean {
if (!isFinite(value)) {
// covers NaN, +Infinity and -Infinity
return false;
}
const absValue = Math.abs(value);
return Math.floor(absValue) === absValue;
}
export function isFiniteNonNegativeNumber(value: unknown): boolean {
if (!(typeof value === "number" && isFinite(value))) {
// covers NaN, +Infinity and -Infinity
return false;
}
return value >= 0;
}
export function isAbortSignal(signal: any): signal is AbortSignal {
if (typeof signal !== "object" || signal === null) {
return false;
}
try {
// TODO
// calling signal.aborted() probably isn't the right way to perform this test
// https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/shared-internals.ts#L41
signal.aborted();
return true;
} catch (err) {
return false;
}
}
export function invokeOrNoop<O extends object, P extends keyof O>(
o: O,
p: P,
args: any[]
): any {
// Assert: O is not undefined.
// Assert: IsPropertyKey(P) is true.
// Assert: args is a List.
const method: Function | undefined = (o as any)[p]; // tslint:disable-line:ban-types
if (method === undefined) {
return undefined;
}
return Function.prototype.apply.call(method, o, args);
}
export function cloneArrayBuffer(
srcBuffer: ArrayBufferLike,
srcByteOffset: number,
srcLength: number,
cloneConstructor: ArrayBufferConstructor | SharedArrayBufferConstructor
): InstanceType<typeof cloneConstructor> {
// this function fudges the return type but SharedArrayBuffer is disabled for a while anyway
return srcBuffer.slice(
srcByteOffset,
srcByteOffset + srcLength
) as InstanceType<typeof cloneConstructor>;
}
export function transferArrayBuffer(buffer: ArrayBufferLike): ArrayBuffer {
// This would in a JS engine context detach the buffer's backing store and return
// a new ArrayBuffer with the same backing store, invalidating `buffer`,
// i.e. a move operation in C++ parlance.
// Sadly ArrayBuffer.transfer is yet to be implemented by a single browser vendor.
return buffer.slice(0); // copies instead of moves
}
export function copyDataBlockBytes(
toBlock: ArrayBufferLike,
toIndex: number,
fromBlock: ArrayBufferLike,
fromIndex: number,
count: number
): void {
new Uint8Array(toBlock, toIndex, count).set(
new Uint8Array(fromBlock, fromIndex, count)
);
}
// helper memoisation map for object values
// weak so it doesn't keep memoized versions of old objects indefinitely.
const objectCloneMemo = new WeakMap<object, object>();
let sharedArrayBufferSupported_: boolean | undefined;
function supportsSharedArrayBuffer(): boolean {
if (sharedArrayBufferSupported_ === undefined) {
try {
new SharedArrayBuffer(16);
sharedArrayBufferSupported_ = true;
} catch (e) {
sharedArrayBufferSupported_ = false;
}
}
return sharedArrayBufferSupported_;
}
/**
* Implement a method of value cloning that is reasonably close to performing `StructuredSerialize(StructuredDeserialize(value))`
* from the HTML standard. Used by the internal `readableStreamTee` method to clone values for connected implementations.
* @see https://html.spec.whatwg.org/multipage/structured-data.html#structuredserializeinternal
*/
export function cloneValue(value: any): any {
const valueType = typeof value;
switch (valueType) {
case "number":
case "string":
case "boolean":
case "undefined":
// @ts-ignore
case "bigint":
return value;
case "object": {
if (objectCloneMemo.has(value)) {
return objectCloneMemo.get(value);
}
if (value === null) {
return value;
}
if (value instanceof Date) {
return new Date(value.valueOf());
}
if (value instanceof RegExp) {
return new RegExp(value);
}
if (supportsSharedArrayBuffer() && value instanceof SharedArrayBuffer) {
return value;
}
if (value instanceof ArrayBuffer) {
const cloned = cloneArrayBuffer(
value,
0,
value.byteLength,
ArrayBuffer
);
objectCloneMemo.set(value, cloned);
return cloned;
}
if (ArrayBuffer.isView(value)) {
const clonedBuffer = cloneValue(value.buffer) as ArrayBufferLike;
// Use DataViewConstructor type purely for type-checking, can be a DataView or TypedArray.
// They use the same constructor signature, only DataView has a length in bytes and TypedArrays
// use a length in terms of elements, so we adjust for that.
let length: number;
if (value instanceof DataView) {
length = value.byteLength;
} else {
length = (value as Uint8Array).length;
}
return new (value.constructor as DataViewConstructor)(
clonedBuffer,
value.byteOffset,
length
);
}
if (value instanceof Map) {
const clonedMap = new Map();
objectCloneMemo.set(value, clonedMap);
value.forEach((v, k) => clonedMap.set(k, cloneValue(v)));
return clonedMap;
}
if (value instanceof Set) {
const clonedSet = new Map();
objectCloneMemo.set(value, clonedSet);
value.forEach((v, k) => clonedSet.set(k, cloneValue(v)));
return clonedSet;
}
// generic object
const clonedObj = {} as any;
objectCloneMemo.set(value, clonedObj);
const sourceKeys = Object.getOwnPropertyNames(value);
for (const key of sourceKeys) {
clonedObj[key] = cloneValue(value[key]);
}
return clonedObj;
}
case "symbol":
case "function":
default:
// TODO this should be a DOMException,
// https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/shared-internals.ts#L171
throw new DenoError(
ErrorKind.DataCloneError,
"Uncloneable value in stream"
);
}
}
export function promiseCall<F extends Function>(
f: F,
v: object | undefined,
args: any[]
): Promise<any> {
// tslint:disable-line:ban-types
try {
const result = Function.prototype.apply.call(f, v, args);
return Promise.resolve(result);
} catch (err) {
return Promise.reject(err);
}
}
export function createAlgorithmFromUnderlyingMethod<
O extends object,
K extends keyof O
>(obj: O, methodName: K, extraArgs: any[]): any {
const method = obj[methodName];
if (method === undefined) {
return (): any => Promise.resolve(undefined);
}
if (typeof method !== "function") {
throw new TypeError(`Field "${methodName}" is not a function.`);
}
return function(...fnArgs: any[]): any {
return promiseCall(method, obj, fnArgs.concat(extraArgs));
};
}
/*
Deprecated for now, all usages replaced by readableStreamCreateReadResult
function createIterResultObject<T>(value: T, done: boolean): IteratorResult<T> {
return { value, done };
}
*/
export function validateAndNormalizeHighWaterMark(hwm: unknown): number {
const highWaterMark = Number(hwm);
if (isNaN(highWaterMark) || highWaterMark < 0) {
throw new RangeError(
"highWaterMark must be a valid, non-negative integer."
);
}
return highWaterMark;
}
export function makeSizeAlgorithmFromSizeFunction<T>(
sizeFn: undefined | ((chunk: T) => number)
): QueuingStrategySizeCallback<T> {
if (typeof sizeFn !== "function" && typeof sizeFn !== "undefined") {
throw new TypeError("size function must be undefined or a function");
}
return function(chunk: T): number {
if (typeof sizeFn === "function") {
return sizeFn(chunk);
}
return 1;
};
}
// ----
export const enum ControlledPromiseState {
Pending,
Resolved,
Rejected
}
export interface ControlledPromise<V> {
resolve(value?: V): void;
reject(error: ErrorResult): void;
promise: Promise<V>;
state: ControlledPromiseState;
}
export function createControlledPromise<V>(): ControlledPromise<V> {
const conProm = {
state: ControlledPromiseState.Pending
} as ControlledPromise<V>;
conProm.promise = new Promise<V>(function(resolve, reject) {
conProm.resolve = function(v?: V): void {
conProm.state = ControlledPromiseState.Resolved;
resolve(v);
};
conProm.reject = function(e?: ErrorResult): void {
conProm.state = ControlledPromiseState.Rejected;
reject(e);
};
});
return conProm;
}

View file

@ -0,0 +1,39 @@
// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/**
* streams/strategies - implementation of the built-in stream strategies
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/
/* eslint-disable @typescript-eslint/no-explicit-any */
// TODO reenable this lint here
import { QueuingStrategy } from "../dom_types.ts";
export class ByteLengthQueuingStrategy
implements QueuingStrategy<ArrayBufferView> {
highWaterMark: number;
constructor(options: { highWaterMark: number }) {
this.highWaterMark = options.highWaterMark;
}
size(chunk: ArrayBufferView): number {
return chunk.byteLength;
}
}
export class CountQueuingStrategy implements QueuingStrategy<any> {
highWaterMark: number;
constructor(options: { highWaterMark: number }) {
this.highWaterMark = options.highWaterMark;
}
size(): number {
return 1;
}
}

View file

@ -0,0 +1,371 @@
// TODO reenable this code when we enable writableStreams and transport types
// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
// /**
// * streams/transform-internals - internal types and functions for transform streams
// * Part of Stardazed
// * (c) 2018-Present by Arthur Langereis - @zenmumbler
// * https://github.com/stardazed/sd-streams
// */
// /* eslint-disable @typescript-eslint/no-explicit-any */
// // TODO reenable this lint here
// import * as rs from "./readable-internals.ts";
// import * as ws from "./writable-internals.ts";
// import * as shared from "./shared-internals.ts";
// import { createReadableStream } from "./readable-stream.ts";
// import { createWritableStream } from "./writable-stream.ts";
// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.ts";
// export const state_ = Symbol("transformState_");
// export const backpressure_ = Symbol("backpressure_");
// export const backpressureChangePromise_ = Symbol("backpressureChangePromise_");
// export const readable_ = Symbol("readable_");
// export const transformStreamController_ = Symbol("transformStreamController_");
// export const writable_ = Symbol("writable_");
// export const controlledTransformStream_ = Symbol("controlledTransformStream_");
// export const flushAlgorithm_ = Symbol("flushAlgorithm_");
// export const transformAlgorithm_ = Symbol("transformAlgorithm_");
// // ----
// export type TransformFunction<InputType, OutputType> = (
// chunk: InputType,
// controller: TransformStreamDefaultController<InputType, OutputType>
// ) => void | PromiseLike<void>;
// export type TransformAlgorithm<InputType> = (chunk: InputType) => Promise<void>;
// export type FlushFunction<InputType, OutputType> = (
// controller: TransformStreamDefaultController<InputType, OutputType>
// ) => void | PromiseLike<void>;
// export type FlushAlgorithm = () => Promise<void>;
// // ----
// export interface TransformStreamDefaultController<InputType, OutputType> {
// readonly desiredSize: number | null;
// enqueue(chunk: OutputType): void;
// error(reason: shared.ErrorResult): void;
// terminate(): void;
// [controlledTransformStream_]: TransformStream<InputType, OutputType>; // The TransformStream instance controlled; also used for the IsTransformStreamDefaultController brand check
// [flushAlgorithm_]: FlushAlgorithm; // A promise - returning algorithm which communicates a requested close to the transformer
// [transformAlgorithm_]: TransformAlgorithm<InputType>; // A promise - returning algorithm, taking one argument(the chunk to transform), which requests the transformer perform its transformation
// }
// export interface Transformer<InputType, OutputType> {
// start?(
// controller: TransformStreamDefaultController<InputType, OutputType>
// ): void | PromiseLike<void>;
// transform?: TransformFunction<InputType, OutputType>;
// flush?: FlushFunction<InputType, OutputType>;
// readableType?: undefined; // for future spec changes
// writableType?: undefined; // for future spec changes
// }
// export declare class TransformStream<InputType, OutputType> {
// constructor(
// transformer: Transformer<InputType, OutputType>,
// writableStrategy: QueuingStrategy<InputType>,
// readableStrategy: QueuingStrategy<OutputType>
// );
// readonly readable: rs.SDReadableStream<OutputType>;
// readonly writable: ws.WritableStream<InputType>;
// [backpressure_]: boolean | undefined; // Whether there was backpressure on [[readable]] the last time it was observed
// [backpressureChangePromise_]: shared.ControlledPromise<void> | undefined; // A promise which is fulfilled and replaced every time the value of[[backpressure]] changes
// [readable_]: rs.SDReadableStream<OutputType>; // The ReadableStream instance controlled by this object
// [transformStreamController_]: TransformStreamDefaultController<
// InputType,
// OutputType
// >; // A TransformStreamDefaultController created with the ability to control[[readable]] and[[writable]]; also used for the IsTransformStream brand check
// [writable_]: ws.WritableStream<InputType>; // The WritableStream instance controlled by this object
// }
// // ---- TransformStream
// export function isTransformStream(
// value: unknown
// ): value is TransformStream<any, any> {
// if (typeof value !== "object" || value === null) {
// return false;
// }
// return transformStreamController_ in value;
// }
// export function initializeTransformStream<InputType, OutputType>(
// stream: TransformStream<InputType, OutputType>,
// startPromise: Promise<void>,
// writableHighWaterMark: number,
// writableSizeAlgorithm: QueuingStrategySizeCallback<InputType>,
// readableHighWaterMark: number,
// readableSizeAlgorithm: QueuingStrategySizeCallback<OutputType>
// ): void {
// const startAlgorithm = function(): Promise<void> {
// return startPromise;
// };
// const writeAlgorithm = function(chunk: InputType): Promise<void> {
// return transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
// };
// const abortAlgorithm = function(reason: shared.ErrorResult): Promise<void> {
// return transformStreamDefaultSinkAbortAlgorithm(stream, reason);
// };
// const closeAlgorithm = function(): Promise<void> {
// return transformStreamDefaultSinkCloseAlgorithm(stream);
// };
// stream[writable_] = createWritableStream<InputType>(
// startAlgorithm,
// writeAlgorithm,
// closeAlgorithm,
// abortAlgorithm,
// writableHighWaterMark,
// writableSizeAlgorithm
// );
// const pullAlgorithm = function(): Promise<void> {
// return transformStreamDefaultSourcePullAlgorithm(stream);
// };
// const cancelAlgorithm = function(
// reason: shared.ErrorResult
// ): Promise<undefined> {
// transformStreamErrorWritableAndUnblockWrite(stream, reason);
// return Promise.resolve(undefined);
// };
// stream[readable_] = createReadableStream(
// startAlgorithm,
// pullAlgorithm,
// cancelAlgorithm,
// readableHighWaterMark,
// readableSizeAlgorithm
// );
// stream[backpressure_] = undefined;
// stream[backpressureChangePromise_] = undefined;
// transformStreamSetBackpressure(stream, true);
// stream[transformStreamController_] = undefined!; // initialize slot for brand-check
// }
// export function transformStreamError<InputType, OutputType>(
// stream: TransformStream<InputType, OutputType>,
// error: shared.ErrorResult
// ): void {
// rs.readableStreamDefaultControllerError(
// stream[readable_][
// rs.readableStreamController_
// ] as rs.SDReadableStreamDefaultController<OutputType>,
// error
// );
// transformStreamErrorWritableAndUnblockWrite(stream, error);
// }
// export function transformStreamErrorWritableAndUnblockWrite<
// InputType,
// OutputType
// >(
// stream: TransformStream<InputType, OutputType>,
// error: shared.ErrorResult
// ): void {
// transformStreamDefaultControllerClearAlgorithms(
// stream[transformStreamController_]
// );
// ws.writableStreamDefaultControllerErrorIfNeeded(
// stream[writable_][ws.writableStreamController_]!,
// error
// );
// if (stream[backpressure_]) {
// transformStreamSetBackpressure(stream, false);
// }
// }
// export function transformStreamSetBackpressure<InputType, OutputType>(
// stream: TransformStream<InputType, OutputType>,
// backpressure: boolean
// ): void {
// // Assert: stream.[[backpressure]] is not backpressure.
// if (stream[backpressure_] !== undefined) {
// stream[backpressureChangePromise_]!.resolve(undefined);
// }
// stream[backpressureChangePromise_] = shared.createControlledPromise<void>();
// stream[backpressure_] = backpressure;
// }
// // ---- TransformStreamDefaultController
// export function isTransformStreamDefaultController(
// value: unknown
// ): value is TransformStreamDefaultController<any, any> {
// if (typeof value !== "object" || value === null) {
// return false;
// }
// return controlledTransformStream_ in value;
// }
// export function setUpTransformStreamDefaultController<InputType, OutputType>(
// stream: TransformStream<InputType, OutputType>,
// controller: TransformStreamDefaultController<InputType, OutputType>,
// transformAlgorithm: TransformAlgorithm<InputType>,
// flushAlgorithm: FlushAlgorithm
// ): void {
// // Assert: ! IsTransformStream(stream) is true.
// // Assert: stream.[[transformStreamController]] is undefined.
// controller[controlledTransformStream_] = stream;
// stream[transformStreamController_] = controller;
// controller[transformAlgorithm_] = transformAlgorithm;
// controller[flushAlgorithm_] = flushAlgorithm;
// }
// export function transformStreamDefaultControllerClearAlgorithms<
// InputType,
// OutputType
// >(controller: TransformStreamDefaultController<InputType, OutputType>): void {
// // Use ! assertions to override type check here, this way we don't
// // have to perform type checks/assertions everywhere else.
// controller[transformAlgorithm_] = undefined!;
// controller[flushAlgorithm_] = undefined!;
// }
// export function transformStreamDefaultControllerEnqueue<InputType, OutputType>(
// controller: TransformStreamDefaultController<InputType, OutputType>,
// chunk: OutputType
// ): void {
// const stream = controller[controlledTransformStream_];
// const readableController = stream[readable_][
// rs.readableStreamController_
// ] as rs.SDReadableStreamDefaultController<OutputType>;
// if (
// !rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
// ) {
// throw new TypeError();
// }
// try {
// rs.readableStreamDefaultControllerEnqueue(readableController, chunk);
// } catch (error) {
// transformStreamErrorWritableAndUnblockWrite(stream, error);
// throw stream[readable_][shared.storedError_];
// }
// const backpressure = rs.readableStreamDefaultControllerHasBackpressure(
// readableController
// );
// if (backpressure !== stream[backpressure_]) {
// // Assert: backpressure is true.
// transformStreamSetBackpressure(stream, true);
// }
// }
// export function transformStreamDefaultControllerError<InputType, OutputType>(
// controller: TransformStreamDefaultController<InputType, OutputType>,
// error: shared.ErrorResult
// ): void {
// transformStreamError(controller[controlledTransformStream_], error);
// }
// export function transformStreamDefaultControllerPerformTransform<
// InputType,
// OutputType
// >(
// controller: TransformStreamDefaultController<InputType, OutputType>,
// chunk: InputType
// ): Promise<void> {
// const transformPromise = controller[transformAlgorithm_](chunk);
// return transformPromise.catch(error => {
// transformStreamError(controller[controlledTransformStream_], error);
// throw error;
// });
// }
// export function transformStreamDefaultControllerTerminate<
// InputType,
// OutputType
// >(controller: TransformStreamDefaultController<InputType, OutputType>): void {
// const stream = controller[controlledTransformStream_];
// const readableController = stream[readable_][
// rs.readableStreamController_
// ] as rs.SDReadableStreamDefaultController<OutputType>;
// if (rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) {
// rs.readableStreamDefaultControllerClose(readableController);
// }
// const error = new TypeError("The transform stream has been terminated");
// transformStreamErrorWritableAndUnblockWrite(stream, error);
// }
// // ---- Transform Sinks
// export function transformStreamDefaultSinkWriteAlgorithm<InputType, OutputType>(
// stream: TransformStream<InputType, OutputType>,
// chunk: InputType
// ): Promise<void> {
// // Assert: stream.[[writable]].[[state]] is "writable".
// const controller = stream[transformStreamController_];
// if (stream[backpressure_]) {
// const backpressureChangePromise = stream[backpressureChangePromise_]!;
// // Assert: backpressureChangePromise is not undefined.
// return backpressureChangePromise.promise.then(_ => {
// const writable = stream[writable_];
// const state = writable[shared.state_];
// if (state === "erroring") {
// throw writable[shared.storedError_];
// }
// // Assert: state is "writable".
// return transformStreamDefaultControllerPerformTransform(
// controller,
// chunk
// );
// });
// }
// return transformStreamDefaultControllerPerformTransform(controller, chunk);
// }
// export function transformStreamDefaultSinkAbortAlgorithm<InputType, OutputType>(
// stream: TransformStream<InputType, OutputType>,
// reason: shared.ErrorResult
// ): Promise<void> {
// transformStreamError(stream, reason);
// return Promise.resolve(undefined);
// }
// export function transformStreamDefaultSinkCloseAlgorithm<InputType, OutputType>(
// stream: TransformStream<InputType, OutputType>
// ): Promise<void> {
// const readable = stream[readable_];
// const controller = stream[transformStreamController_];
// const flushPromise = controller[flushAlgorithm_]();
// transformStreamDefaultControllerClearAlgorithms(controller);
// return flushPromise.then(
// _ => {
// if (readable[shared.state_] === "errored") {
// throw readable[shared.storedError_];
// }
// const readableController = readable[
// rs.readableStreamController_
// ] as rs.SDReadableStreamDefaultController<OutputType>;
// if (
// rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
// ) {
// rs.readableStreamDefaultControllerClose(readableController);
// }
// },
// error => {
// transformStreamError(stream, error);
// throw readable[shared.storedError_];
// }
// );
// }
// // ---- Transform Sources
// export function transformStreamDefaultSourcePullAlgorithm<
// InputType,
// OutputType
// >(stream: TransformStream<InputType, OutputType>): Promise<void> {
// // Assert: stream.[[backpressure]] is true.
// // Assert: stream.[[backpressureChangePromise]] is not undefined.
// transformStreamSetBackpressure(stream, false);
// return stream[backpressureChangePromise_]!.promise;
// }

View file

@ -0,0 +1,58 @@
// TODO reenable this code when we enable writableStreams and transport types
// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
// /**
// * streams/transform-stream-default-controller - TransformStreamDefaultController class implementation
// * Part of Stardazed
// * (c) 2018-Present by Arthur Langereis - @zenmumbler
// * https://github.com/stardazed/sd-streams
// */
// import * as rs from "./readable-internals.ts";
// import * as ts from "./transform-internals.ts";
// import { ErrorResult } from "./shared-internals.ts";
// export class TransformStreamDefaultController<InputType, OutputType>
// implements ts.TransformStreamDefaultController<InputType, OutputType> {
// [ts.controlledTransformStream_]: ts.TransformStream<InputType, OutputType>;
// [ts.flushAlgorithm_]: ts.FlushAlgorithm;
// [ts.transformAlgorithm_]: ts.TransformAlgorithm<InputType>;
// constructor() {
// throw new TypeError();
// }
// get desiredSize(): number | null {
// if (!ts.isTransformStreamDefaultController(this)) {
// throw new TypeError();
// }
// const readableController = this[ts.controlledTransformStream_][
// ts.readable_
// ][rs.readableStreamController_] as rs.SDReadableStreamDefaultController<
// OutputType
// >;
// return rs.readableStreamDefaultControllerGetDesiredSize(readableController);
// }
// enqueue(chunk: OutputType): void {
// if (!ts.isTransformStreamDefaultController(this)) {
// throw new TypeError();
// }
// ts.transformStreamDefaultControllerEnqueue(this, chunk);
// }
// error(reason: ErrorResult): void {
// if (!ts.isTransformStreamDefaultController(this)) {
// throw new TypeError();
// }
// ts.transformStreamDefaultControllerError(this, reason);
// }
// terminate(): void {
// if (!ts.isTransformStreamDefaultController(this)) {
// throw new TypeError();
// }
// ts.transformStreamDefaultControllerTerminate(this);
// }
// }

View file

@ -0,0 +1,147 @@
// TODO reenable this code when we enable writableStreams and transport types
// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
// /**
// * streams/transform-stream - TransformStream class implementation
// * Part of Stardazed
// * (c) 2018-Present by Arthur Langereis - @zenmumbler
// * https://github.com/stardazed/sd-streams
// */
// /* eslint-disable @typescript-eslint/no-explicit-any */
// // TODO reenable this lint here
// import * as rs from "./readable-internals.ts";
// import * as ws from "./writable-internals.ts";
// import * as ts from "./transform-internals.ts";
// import * as shared from "./shared-internals.ts";
// import { TransformStreamDefaultController } from "./transform-stream-default-controller.ts";
// import { QueuingStrategy } from "../dom_types.ts";
// export class TransformStream<InputType, OutputType> {
// [ts.backpressure_]: boolean | undefined; // Whether there was backpressure on [[readable]] the last time it was observed
// [ts.backpressureChangePromise_]: shared.ControlledPromise<void>; // A promise which is fulfilled and replaced every time the value of[[backpressure]] changes
// [ts.readable_]: rs.SDReadableStream<OutputType>; // The ReadableStream instance controlled by this object
// [ts.transformStreamController_]: TransformStreamDefaultController<
// InputType,
// OutputType
// >; // A TransformStreamDefaultController created with the ability to control[[readable]] and[[writable]]; also used for the IsTransformStream brand check
// [ts.writable_]: ws.WritableStream<InputType>; // The WritableStream instance controlled by this object
// constructor(
// transformer: ts.Transformer<InputType, OutputType> = {},
// writableStrategy: QueuingStrategy<InputType> = {},
// readableStrategy: QueuingStrategy<OutputType> = {}
// ) {
// const writableSizeFunction = writableStrategy.size;
// const writableHighWaterMark = writableStrategy.highWaterMark;
// const readableSizeFunction = readableStrategy.size;
// const readableHighWaterMark = readableStrategy.highWaterMark;
// const writableType = transformer.writableType;
// if (writableType !== undefined) {
// throw new RangeError(
// "The transformer's `writableType` field must be undefined"
// );
// }
// const writableSizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(
// writableSizeFunction
// );
// const writableHWM = shared.validateAndNormalizeHighWaterMark(
// writableHighWaterMark === undefined ? 1 : writableHighWaterMark
// );
// const readableType = transformer.readableType;
// if (readableType !== undefined) {
// throw new RangeError(
// "The transformer's `readableType` field must be undefined"
// );
// }
// const readableSizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(
// readableSizeFunction
// );
// const readableHWM = shared.validateAndNormalizeHighWaterMark(
// readableHighWaterMark === undefined ? 0 : readableHighWaterMark
// );
// const startPromise = shared.createControlledPromise<void>();
// ts.initializeTransformStream(
// this,
// startPromise.promise,
// writableHWM,
// writableSizeAlgorithm,
// readableHWM,
// readableSizeAlgorithm
// );
// setUpTransformStreamDefaultControllerFromTransformer(this, transformer);
// const startResult = shared.invokeOrNoop(transformer, "start", [
// this[ts.transformStreamController_]
// ]);
// startPromise.resolve(startResult);
// }
// get readable(): rs.SDReadableStream<OutputType> {
// if (!ts.isTransformStream(this)) {
// throw new TypeError();
// }
// return this[ts.readable_];
// }
// get writable(): ws.WritableStream<InputType> {
// if (!ts.isTransformStream(this)) {
// throw new TypeError();
// }
// return this[ts.writable_];
// }
// }
// function setUpTransformStreamDefaultControllerFromTransformer<
// InputType,
// OutputType
// >(
// stream: TransformStream<InputType, OutputType>,
// transformer: ts.Transformer<InputType, OutputType>
// ): void {
// const controller = Object.create(
// TransformStreamDefaultController.prototype
// ) as TransformStreamDefaultController<InputType, OutputType>;
// let transformAlgorithm: ts.TransformAlgorithm<InputType>;
// const transformMethod = transformer.transform;
// if (transformMethod !== undefined) {
// if (typeof transformMethod !== "function") {
// throw new TypeError(
// "`transform` field of the transformer must be a function"
// );
// }
// transformAlgorithm = (chunk: InputType): Promise<any> =>
// shared.promiseCall(transformMethod, transformer, [chunk, controller]);
// } else {
// // use identity transform
// transformAlgorithm = function(chunk: InputType): Promise<void> {
// try {
// // OutputType and InputType are the same here
// ts.transformStreamDefaultControllerEnqueue(
// controller,
// (chunk as unknown) as OutputType
// );
// } catch (error) {
// return Promise.reject(error);
// }
// return Promise.resolve(undefined);
// };
// }
// const flushAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
// transformer,
// "flush",
// [controller]
// );
// ts.setUpTransformStreamDefaultController(
// stream,
// controller,
// transformAlgorithm,
// flushAlgorithm
// );
// }

View file

@ -0,0 +1,800 @@
// TODO reenable this code when we enable writableStreams and transport types
// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
// /**
// * streams/writable-internals - internal types and functions for writable streams
// * Part of Stardazed
// * (c) 2018-Present by Arthur Langereis - @zenmumbler
// * https://github.com/stardazed/sd-streams
// */
// /* eslint-disable @typescript-eslint/no-explicit-any */
// // TODO reenable this lint here
// import * as shared from "./shared-internals.ts";
// import * as q from "./queue-mixin.ts";
// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.ts";
// export const backpressure_ = Symbol("backpressure_");
// export const closeRequest_ = Symbol("closeRequest_");
// export const inFlightWriteRequest_ = Symbol("inFlightWriteRequest_");
// export const inFlightCloseRequest_ = Symbol("inFlightCloseRequest_");
// export const pendingAbortRequest_ = Symbol("pendingAbortRequest_");
// export const writableStreamController_ = Symbol("writableStreamController_");
// export const writer_ = Symbol("writer_");
// export const writeRequests_ = Symbol("writeRequests_");
// export const abortAlgorithm_ = Symbol("abortAlgorithm_");
// export const closeAlgorithm_ = Symbol("closeAlgorithm_");
// export const controlledWritableStream_ = Symbol("controlledWritableStream_");
// export const started_ = Symbol("started_");
// export const strategyHWM_ = Symbol("strategyHWM_");
// export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_");
// export const writeAlgorithm_ = Symbol("writeAlgorithm_");
// export const ownerWritableStream_ = Symbol("ownerWritableStream_");
// export const closedPromise_ = Symbol("closedPromise_");
// export const readyPromise_ = Symbol("readyPromise_");
// export const errorSteps_ = Symbol("errorSteps_");
// export const abortSteps_ = Symbol("abortSteps_");
// export type StartFunction = (
// controller: WritableStreamController
// ) => void | PromiseLike<void>;
// export type StartAlgorithm = () => Promise<void> | void;
// export type WriteFunction<InputType> = (
// chunk: InputType,
// controller: WritableStreamController
// ) => void | PromiseLike<void>;
// export type WriteAlgorithm<InputType> = (chunk: InputType) => Promise<void>;
// export type CloseAlgorithm = () => Promise<void>;
// export type AbortAlgorithm = (reason?: shared.ErrorResult) => Promise<void>;
// // ----
// export interface WritableStreamController {
// error(e?: shared.ErrorResult): void;
// [errorSteps_](): void;
// [abortSteps_](reason: shared.ErrorResult): Promise<void>;
// }
// export interface WriteRecord<InputType> {
// chunk: InputType;
// }
// export interface WritableStreamDefaultController<InputType>
// extends WritableStreamController,
// q.QueueContainer<WriteRecord<InputType> | "close"> {
// [abortAlgorithm_]: AbortAlgorithm; // A promise - returning algorithm, taking one argument(the abort reason), which communicates a requested abort to the underlying sink
// [closeAlgorithm_]: CloseAlgorithm; // A promise - returning algorithm which communicates a requested close to the underlying sink
// [controlledWritableStream_]: WritableStream<InputType>; // The WritableStream instance controlled
// [started_]: boolean; // A boolean flag indicating whether the underlying sink has finished starting
// [strategyHWM_]: number; // A number supplied by the creator of the stream as part of the streams queuing strategy, indicating the point at which the stream will apply backpressure to its underlying sink
// [strategySizeAlgorithm_]: QueuingStrategySizeCallback<InputType>; // An algorithm to calculate the size of enqueued chunks, as part of the streams queuing strategy
// [writeAlgorithm_]: WriteAlgorithm<InputType>; // A promise-returning algorithm, taking one argument (the chunk to write), which writes data to the underlying sink
// }
// // ----
// export interface WritableStreamWriter<InputType> {
// readonly closed: Promise<void>;
// readonly desiredSize: number | null;
// readonly ready: Promise<void>;
// abort(reason: shared.ErrorResult): Promise<void>;
// close(): Promise<void>;
// releaseLock(): void;
// write(chunk: InputType): Promise<void>;
// }
// export interface WritableStreamDefaultWriter<InputType>
// extends WritableStreamWriter<InputType> {
// [ownerWritableStream_]: WritableStream<InputType> | undefined;
// [closedPromise_]: shared.ControlledPromise<void>;
// [readyPromise_]: shared.ControlledPromise<void>;
// }
// // ----
// export type WritableStreamState =
// | "writable"
// | "closed"
// | "erroring"
// | "errored";
// export interface WritableStreamSink<InputType> {
// start?: StartFunction;
// write?: WriteFunction<InputType>;
// close?(): void | PromiseLike<void>;
// abort?(reason?: shared.ErrorResult): void;
// type?: undefined; // unused, for future revisions
// }
// export interface AbortRequest {
// reason: shared.ErrorResult;
// wasAlreadyErroring: boolean;
// promise: Promise<void>;
// resolve(): void;
// reject(error: shared.ErrorResult): void;
// }
// export declare class WritableStream<InputType> {
// constructor(
// underlyingSink?: WritableStreamSink<InputType>,
// strategy?: QueuingStrategy<InputType>
// );
// readonly locked: boolean;
// abort(reason?: shared.ErrorResult): Promise<void>;
// getWriter(): WritableStreamWriter<InputType>;
// [shared.state_]: WritableStreamState;
// [backpressure_]: boolean;
// [closeRequest_]: shared.ControlledPromise<void> | undefined;
// [inFlightWriteRequest_]: shared.ControlledPromise<void> | undefined;
// [inFlightCloseRequest_]: shared.ControlledPromise<void> | undefined;
// [pendingAbortRequest_]: AbortRequest | undefined;
// [shared.storedError_]: shared.ErrorResult;
// [writableStreamController_]:
// | WritableStreamDefaultController<InputType>
// | undefined;
// [writer_]: WritableStreamDefaultWriter<InputType> | undefined;
// [writeRequests_]: Array<shared.ControlledPromise<void>>;
// }
// // ---- Stream
// export function initializeWritableStream<InputType>(
// stream: WritableStream<InputType>
// ): void {
// stream[shared.state_] = "writable";
// stream[shared.storedError_] = undefined;
// stream[writer_] = undefined;
// stream[writableStreamController_] = undefined;
// stream[inFlightWriteRequest_] = undefined;
// stream[closeRequest_] = undefined;
// stream[inFlightCloseRequest_] = undefined;
// stream[pendingAbortRequest_] = undefined;
// stream[writeRequests_] = [];
// stream[backpressure_] = false;
// }
// export function isWritableStream(value: unknown): value is WritableStream<any> {
// if (typeof value !== "object" || value === null) {
// return false;
// }
// return writableStreamController_ in value;
// }
// export function isWritableStreamLocked<InputType>(
// stream: WritableStream<InputType>
// ): boolean {
// return stream[writer_] !== undefined;
// }
// export function writableStreamAbort<InputType>(
// stream: WritableStream<InputType>,
// reason: shared.ErrorResult
// ): Promise<void> {
// const state = stream[shared.state_];
// if (state === "closed" || state === "errored") {
// return Promise.resolve(undefined);
// }
// let pending = stream[pendingAbortRequest_];
// if (pending !== undefined) {
// return pending.promise;
// }
// // Assert: state is "writable" or "erroring".
// let wasAlreadyErroring = false;
// if (state === "erroring") {
// wasAlreadyErroring = true;
// reason = undefined;
// }
// pending = {
// reason,
// wasAlreadyErroring
// } as AbortRequest;
// const promise = new Promise<void>((resolve, reject) => {
// pending!.resolve = resolve;
// pending!.reject = reject;
// });
// pending.promise = promise;
// stream[pendingAbortRequest_] = pending;
// if (!wasAlreadyErroring) {
// writableStreamStartErroring(stream, reason);
// }
// return promise;
// }
// export function writableStreamAddWriteRequest<InputType>(
// stream: WritableStream<InputType>
// ): Promise<void> {
// // Assert: !IsWritableStreamLocked(stream) is true.
// // Assert: stream.[[state]] is "writable".
// const writePromise = shared.createControlledPromise<void>();
// stream[writeRequests_].push(writePromise);
// return writePromise.promise;
// }
// export function writableStreamDealWithRejection<InputType>(
// stream: WritableStream<InputType>,
// error: shared.ErrorResult
// ): void {
// const state = stream[shared.state_];
// if (state === "writable") {
// writableStreamStartErroring(stream, error);
// return;
// }
// // Assert: state is "erroring"
// writableStreamFinishErroring(stream);
// }
// export function writableStreamStartErroring<InputType>(
// stream: WritableStream<InputType>,
// reason: shared.ErrorResult
// ): void {
// // Assert: stream.[[storedError]] is undefined.
// // Assert: stream.[[state]] is "writable".
// const controller = stream[writableStreamController_]!;
// // Assert: controller is not undefined.
// stream[shared.state_] = "erroring";
// stream[shared.storedError_] = reason;
// const writer = stream[writer_];
// if (writer !== undefined) {
// writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
// }
// if (
// !writableStreamHasOperationMarkedInFlight(stream) &&
// controller[started_]
// ) {
// writableStreamFinishErroring(stream);
// }
// }
// export function writableStreamFinishErroring<InputType>(
// stream: WritableStream<InputType>
// ): void {
// // Assert: stream.[[state]] is "erroring".
// // Assert: writableStreamHasOperationMarkedInFlight(stream) is false.
// stream[shared.state_] = "errored";
// const controller = stream[writableStreamController_]!;
// controller[errorSteps_]();
// const storedError = stream[shared.storedError_];
// for (const writeRequest of stream[writeRequests_]) {
// writeRequest.reject(storedError);
// }
// stream[writeRequests_] = [];
// const abortRequest = stream[pendingAbortRequest_];
// if (abortRequest === undefined) {
// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
// return;
// }
// stream[pendingAbortRequest_] = undefined;
// if (abortRequest.wasAlreadyErroring) {
// abortRequest.reject(storedError);
// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
// return;
// }
// const promise = controller[abortSteps_](abortRequest.reason);
// promise.then(
// _ => {
// abortRequest.resolve();
// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
// },
// error => {
// abortRequest.reject(error);
// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
// }
// );
// }
// export function writableStreamFinishInFlightWrite<InputType>(
// stream: WritableStream<InputType>
// ): void {
// // Assert: stream.[[inFlightWriteRequest]] is not undefined.
// stream[inFlightWriteRequest_]!.resolve(undefined);
// stream[inFlightWriteRequest_] = undefined;
// }
// export function writableStreamFinishInFlightWriteWithError<InputType>(
// stream: WritableStream<InputType>,
// error: shared.ErrorResult
// ): void {
// // Assert: stream.[[inFlightWriteRequest]] is not undefined.
// stream[inFlightWriteRequest_]!.reject(error);
// stream[inFlightWriteRequest_] = undefined;
// // Assert: stream.[[state]] is "writable" or "erroring".
// writableStreamDealWithRejection(stream, error);
// }
// export function writableStreamFinishInFlightClose<InputType>(
// stream: WritableStream<InputType>
// ): void {
// // Assert: stream.[[inFlightCloseRequest]] is not undefined.
// stream[inFlightCloseRequest_]!.resolve(undefined);
// stream[inFlightCloseRequest_] = undefined;
// const state = stream[shared.state_];
// // Assert: stream.[[state]] is "writable" or "erroring".
// if (state === "erroring") {
// stream[shared.storedError_] = undefined;
// if (stream[pendingAbortRequest_] !== undefined) {
// stream[pendingAbortRequest_]!.resolve();
// stream[pendingAbortRequest_] = undefined;
// }
// }
// stream[shared.state_] = "closed";
// const writer = stream[writer_];
// if (writer !== undefined) {
// writer[closedPromise_].resolve(undefined);
// }
// // Assert: stream.[[pendingAbortRequest]] is undefined.
// // Assert: stream.[[storedError]] is undefined.
// }
// export function writableStreamFinishInFlightCloseWithError<InputType>(
// stream: WritableStream<InputType>,
// error: shared.ErrorResult
// ): void {
// // Assert: stream.[[inFlightCloseRequest]] is not undefined.
// stream[inFlightCloseRequest_]!.reject(error);
// stream[inFlightCloseRequest_] = undefined;
// // Assert: stream.[[state]] is "writable" or "erroring".
// if (stream[pendingAbortRequest_] !== undefined) {
// stream[pendingAbortRequest_]!.reject(error);
// stream[pendingAbortRequest_] = undefined;
// }
// writableStreamDealWithRejection(stream, error);
// }
// export function writableStreamCloseQueuedOrInFlight<InputType>(
// stream: WritableStream<InputType>
// ): boolean {
// return (
// stream[closeRequest_] !== undefined ||
// stream[inFlightCloseRequest_] !== undefined
// );
// }
// export function writableStreamHasOperationMarkedInFlight<InputType>(
// stream: WritableStream<InputType>
// ): boolean {
// return (
// stream[inFlightWriteRequest_] !== undefined ||
// stream[inFlightCloseRequest_] !== undefined
// );
// }
// export function writableStreamMarkCloseRequestInFlight<InputType>(
// stream: WritableStream<InputType>
// ): void {
// // Assert: stream.[[inFlightCloseRequest]] is undefined.
// // Assert: stream.[[closeRequest]] is not undefined.
// stream[inFlightCloseRequest_] = stream[closeRequest_];
// stream[closeRequest_] = undefined;
// }
// export function writableStreamMarkFirstWriteRequestInFlight<InputType>(
// stream: WritableStream<InputType>
// ): void {
// // Assert: stream.[[inFlightWriteRequest]] is undefined.
// // Assert: stream.[[writeRequests]] is not empty.
// const writeRequest = stream[writeRequests_].shift()!;
// stream[inFlightWriteRequest_] = writeRequest;
// }
// export function writableStreamRejectCloseAndClosedPromiseIfNeeded<InputType>(
// stream: WritableStream<InputType>
// ): void {
// // Assert: stream.[[state]] is "errored".
// const closeRequest = stream[closeRequest_];
// if (closeRequest !== undefined) {
// // Assert: stream.[[inFlightCloseRequest]] is undefined.
// closeRequest.reject(stream[shared.storedError_]);
// stream[closeRequest_] = undefined;
// }
// const writer = stream[writer_];
// if (writer !== undefined) {
// writer[closedPromise_].reject(stream[shared.storedError_]);
// writer[closedPromise_].promise.catch(() => {});
// }
// }
// export function writableStreamUpdateBackpressure<InputType>(
// stream: WritableStream<InputType>,
// backpressure: boolean
// ): void {
// // Assert: stream.[[state]] is "writable".
// // Assert: !WritableStreamCloseQueuedOrInFlight(stream) is false.
// const writer = stream[writer_];
// if (writer !== undefined && backpressure !== stream[backpressure_]) {
// if (backpressure) {
// writer[readyPromise_] = shared.createControlledPromise<void>();
// } else {
// writer[readyPromise_].resolve(undefined);
// }
// }
// stream[backpressure_] = backpressure;
// }
// // ---- Writers
// export function isWritableStreamDefaultWriter(
// value: unknown
// ): value is WritableStreamDefaultWriter<any> {
// if (typeof value !== "object" || value === null) {
// return false;
// }
// return ownerWritableStream_ in value;
// }
// export function writableStreamDefaultWriterAbort<InputType>(
// writer: WritableStreamDefaultWriter<InputType>,
// reason: shared.ErrorResult
// ): Promise<void> {
// const stream = writer[ownerWritableStream_]!;
// // Assert: stream is not undefined.
// return writableStreamAbort(stream, reason);
// }
// export function writableStreamDefaultWriterClose<InputType>(
// writer: WritableStreamDefaultWriter<InputType>
// ): Promise<void> {
// const stream = writer[ownerWritableStream_]!;
// // Assert: stream is not undefined.
// const state = stream[shared.state_];
// if (state === "closed" || state === "errored") {
// return Promise.reject(
// new TypeError("Writer stream is already closed or errored")
// );
// }
// // Assert: state is "writable" or "erroring".
// // Assert: writableStreamCloseQueuedOrInFlight(stream) is false.
// const closePromise = shared.createControlledPromise<void>();
// stream[closeRequest_] = closePromise;
// if (stream[backpressure_] && state === "writable") {
// writer[readyPromise_].resolve(undefined);
// }
// writableStreamDefaultControllerClose(stream[writableStreamController_]!);
// return closePromise.promise;
// }
// export function writableStreamDefaultWriterCloseWithErrorPropagation<InputType>(
// writer: WritableStreamDefaultWriter<InputType>
// ): Promise<void> {
// const stream = writer[ownerWritableStream_]!;
// // Assert: stream is not undefined.
// const state = stream[shared.state_];
// if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
// return Promise.resolve(undefined);
// }
// if (state === "errored") {
// return Promise.reject(stream[shared.storedError_]);
// }
// // Assert: state is "writable" or "erroring".
// return writableStreamDefaultWriterClose(writer);
// }
// export function writableStreamDefaultWriterEnsureClosedPromiseRejected<
// InputType
// >(
// writer: WritableStreamDefaultWriter<InputType>,
// error: shared.ErrorResult
// ): void {
// const closedPromise = writer[closedPromise_];
// if (closedPromise.state === shared.ControlledPromiseState.Pending) {
// closedPromise.reject(error);
// } else {
// writer[closedPromise_] = shared.createControlledPromise<void>();
// writer[closedPromise_].reject(error);
// }
// writer[closedPromise_].promise.catch(() => {});
// }
// export function writableStreamDefaultWriterEnsureReadyPromiseRejected<
// InputType
// >(
// writer: WritableStreamDefaultWriter<InputType>,
// error: shared.ErrorResult
// ): void {
// const readyPromise = writer[readyPromise_];
// if (readyPromise.state === shared.ControlledPromiseState.Pending) {
// readyPromise.reject(error);
// } else {
// writer[readyPromise_] = shared.createControlledPromise<void>();
// writer[readyPromise_].reject(error);
// }
// writer[readyPromise_].promise.catch(() => {});
// }
// export function writableStreamDefaultWriterGetDesiredSize<InputType>(
// writer: WritableStreamDefaultWriter<InputType>
// ): number | null {
// const stream = writer[ownerWritableStream_]!;
// const state = stream[shared.state_];
// if (state === "errored" || state === "erroring") {
// return null;
// }
// if (state === "closed") {
// return 0;
// }
// return writableStreamDefaultControllerGetDesiredSize(
// stream[writableStreamController_]!
// );
// }
// export function writableStreamDefaultWriterRelease<InputType>(
// writer: WritableStreamDefaultWriter<InputType>
// ): void {
// const stream = writer[ownerWritableStream_]!;
// // Assert: stream is not undefined.
// // Assert: stream.[[writer]] is writer.
// const releasedError = new TypeError();
// writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
// writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError);
// stream[writer_] = undefined;
// writer[ownerWritableStream_] = undefined;
// }
// export function writableStreamDefaultWriterWrite<InputType>(
// writer: WritableStreamDefaultWriter<InputType>,
// chunk: InputType
// ): Promise<void> {
// const stream = writer[ownerWritableStream_]!;
// // Assert: stream is not undefined.
// const controller = stream[writableStreamController_]!;
// const chunkSize = writableStreamDefaultControllerGetChunkSize(
// controller,
// chunk
// );
// if (writer[ownerWritableStream_] !== stream) {
// return Promise.reject(new TypeError());
// }
// const state = stream[shared.state_];
// if (state === "errored") {
// return Promise.reject(stream[shared.storedError_]);
// }
// if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
// return Promise.reject(
// new TypeError("Cannot write to a closing or closed stream")
// );
// }
// if (state === "erroring") {
// return Promise.reject(stream[shared.storedError_]);
// }
// // Assert: state is "writable".
// const promise = writableStreamAddWriteRequest(stream);
// writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
// return promise;
// }
// // ---- Controller
// export function setUpWritableStreamDefaultController<InputType>(
// stream: WritableStream<InputType>,
// controller: WritableStreamDefaultController<InputType>,
// startAlgorithm: StartAlgorithm,
// writeAlgorithm: WriteAlgorithm<InputType>,
// closeAlgorithm: CloseAlgorithm,
// abortAlgorithm: AbortAlgorithm,
// highWaterMark: number,
// sizeAlgorithm: QueuingStrategySizeCallback<InputType>
// ): void {
// if (!isWritableStream(stream)) {
// throw new TypeError();
// }
// if (stream[writableStreamController_] !== undefined) {
// throw new TypeError();
// }
// controller[controlledWritableStream_] = stream;
// stream[writableStreamController_] = controller;
// q.resetQueue(controller);
// controller[started_] = false;
// controller[strategySizeAlgorithm_] = sizeAlgorithm;
// controller[strategyHWM_] = highWaterMark;
// controller[writeAlgorithm_] = writeAlgorithm;
// controller[closeAlgorithm_] = closeAlgorithm;
// controller[abortAlgorithm_] = abortAlgorithm;
// const backpressure = writableStreamDefaultControllerGetBackpressure(
// controller
// );
// writableStreamUpdateBackpressure(stream, backpressure);
// const startResult = startAlgorithm();
// Promise.resolve(startResult).then(
// _ => {
// // Assert: stream.[[state]] is "writable" or "erroring".
// controller[started_] = true;
// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
// },
// error => {
// // Assert: stream.[[state]] is "writable" or "erroring".
// controller[started_] = true;
// writableStreamDealWithRejection(stream, error);
// }
// );
// }
// export function isWritableStreamDefaultController(
// value: unknown
// ): value is WritableStreamDefaultController<any> {
// if (typeof value !== "object" || value === null) {
// return false;
// }
// return controlledWritableStream_ in value;
// }
// export function writableStreamDefaultControllerClearAlgorithms<InputType>(
// controller: WritableStreamDefaultController<InputType>
// ): void {
// // Use ! assertions to override type check here, this way we don't
// // have to perform type checks/assertions everywhere else.
// controller[writeAlgorithm_] = undefined!;
// controller[closeAlgorithm_] = undefined!;
// controller[abortAlgorithm_] = undefined!;
// controller[strategySizeAlgorithm_] = undefined!;
// }
// export function writableStreamDefaultControllerClose<InputType>(
// controller: WritableStreamDefaultController<InputType>
// ): void {
// q.enqueueValueWithSize(controller, "close", 0);
// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
// }
// export function writableStreamDefaultControllerGetChunkSize<InputType>(
// controller: WritableStreamDefaultController<InputType>,
// chunk: InputType
// ): number {
// let chunkSize: number;
// try {
// chunkSize = controller[strategySizeAlgorithm_](chunk);
// } catch (error) {
// writableStreamDefaultControllerErrorIfNeeded(controller, error);
// chunkSize = 1;
// }
// return chunkSize;
// }
// export function writableStreamDefaultControllerGetDesiredSize<InputType>(
// controller: WritableStreamDefaultController<InputType>
// ): number {
// return controller[strategyHWM_] - controller[q.queueTotalSize_];
// }
// export function writableStreamDefaultControllerWrite<InputType>(
// controller: WritableStreamDefaultController<InputType>,
// chunk: InputType,
// chunkSize: number
// ): void {
// try {
// q.enqueueValueWithSize(controller, { chunk }, chunkSize);
// } catch (error) {
// writableStreamDefaultControllerErrorIfNeeded(controller, error);
// return;
// }
// const stream = controller[controlledWritableStream_];
// if (
// !writableStreamCloseQueuedOrInFlight(stream) &&
// stream[shared.state_] === "writable"
// ) {
// const backpressure = writableStreamDefaultControllerGetBackpressure(
// controller
// );
// writableStreamUpdateBackpressure(stream, backpressure);
// }
// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
// }
// export function writableStreamDefaultControllerAdvanceQueueIfNeeded<InputType>(
// controller: WritableStreamDefaultController<InputType>
// ): void {
// if (!controller[started_]) {
// return;
// }
// const stream = controller[controlledWritableStream_];
// if (stream[inFlightWriteRequest_] !== undefined) {
// return;
// }
// const state = stream[shared.state_];
// if (state === "closed" || state === "errored") {
// return;
// }
// if (state === "erroring") {
// writableStreamFinishErroring(stream);
// return;
// }
// if (controller[q.queue_].length === 0) {
// return;
// }
// const writeRecord = q.peekQueueValue(controller);
// if (writeRecord === "close") {
// writableStreamDefaultControllerProcessClose(controller);
// } else {
// writableStreamDefaultControllerProcessWrite(controller, writeRecord.chunk);
// }
// }
// export function writableStreamDefaultControllerErrorIfNeeded<InputType>(
// controller: WritableStreamDefaultController<InputType>,
// error: shared.ErrorResult
// ): void {
// if (controller[controlledWritableStream_][shared.state_] === "writable") {
// writableStreamDefaultControllerError(controller, error);
// }
// }
// export function writableStreamDefaultControllerProcessClose<InputType>(
// controller: WritableStreamDefaultController<InputType>
// ): void {
// const stream = controller[controlledWritableStream_];
// writableStreamMarkCloseRequestInFlight(stream);
// q.dequeueValue(controller);
// // Assert: controller.[[queue]] is empty.
// const sinkClosePromise = controller[closeAlgorithm_]();
// writableStreamDefaultControllerClearAlgorithms(controller);
// sinkClosePromise.then(
// _ => {
// writableStreamFinishInFlightClose(stream);
// },
// error => {
// writableStreamFinishInFlightCloseWithError(stream, error);
// }
// );
// }
// export function writableStreamDefaultControllerProcessWrite<InputType>(
// controller: WritableStreamDefaultController<InputType>,
// chunk: InputType
// ): void {
// const stream = controller[controlledWritableStream_];
// writableStreamMarkFirstWriteRequestInFlight(stream);
// controller[writeAlgorithm_](chunk).then(
// _ => {
// writableStreamFinishInFlightWrite(stream);
// const state = stream[shared.state_];
// // Assert: state is "writable" or "erroring".
// q.dequeueValue(controller);
// if (
// !writableStreamCloseQueuedOrInFlight(stream) &&
// state === "writable"
// ) {
// const backpressure = writableStreamDefaultControllerGetBackpressure(
// controller
// );
// writableStreamUpdateBackpressure(stream, backpressure);
// }
// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
// },
// error => {
// if (stream[shared.state_] === "writable") {
// writableStreamDefaultControllerClearAlgorithms(controller);
// }
// writableStreamFinishInFlightWriteWithError(stream, error);
// }
// );
// }
// export function writableStreamDefaultControllerGetBackpressure<InputType>(
// controller: WritableStreamDefaultController<InputType>
// ): boolean {
// const desiredSize = writableStreamDefaultControllerGetDesiredSize(controller);
// return desiredSize <= 0;
// }
// export function writableStreamDefaultControllerError<InputType>(
// controller: WritableStreamDefaultController<InputType>,
// error: shared.ErrorResult
// ): void {
// const stream = controller[controlledWritableStream_];
// // Assert: stream.[[state]] is "writable".
// writableStreamDefaultControllerClearAlgorithms(controller);
// writableStreamStartErroring(stream, error);
// }

View file

@ -0,0 +1,101 @@
// TODO reenable this code when we enable writableStreams and transport types
// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
// /**
// * streams/writable-stream-default-controller - WritableStreamDefaultController class implementation
// * Part of Stardazed
// * (c) 2018-Present by Arthur Langereis - @zenmumbler
// * https://github.com/stardazed/sd-streams
// */
// /* eslint-disable @typescript-eslint/no-explicit-any */
// // TODO reenable this lint here
// import * as ws from "./writable-internals.ts";
// import * as shared from "./shared-internals.ts";
// import * as q from "./queue-mixin.ts";
// import { Queue } from "./queue.ts";
// import { QueuingStrategySizeCallback } from "../dom_types.ts";
// export class WritableStreamDefaultController<InputType>
// implements ws.WritableStreamDefaultController<InputType> {
// [ws.abortAlgorithm_]: ws.AbortAlgorithm;
// [ws.closeAlgorithm_]: ws.CloseAlgorithm;
// [ws.controlledWritableStream_]: ws.WritableStream<InputType>;
// [ws.started_]: boolean;
// [ws.strategyHWM_]: number;
// [ws.strategySizeAlgorithm_]: QueuingStrategySizeCallback<InputType>;
// [ws.writeAlgorithm_]: ws.WriteAlgorithm<InputType>;
// [q.queue_]: Queue<q.QueueElement<ws.WriteRecord<InputType> | "close">>;
// [q.queueTotalSize_]: number;
// constructor() {
// throw new TypeError();
// }
// error(e?: shared.ErrorResult): void {
// if (!ws.isWritableStreamDefaultController(this)) {
// throw new TypeError();
// }
// const state = this[ws.controlledWritableStream_][shared.state_];
// if (state !== "writable") {
// return;
// }
// ws.writableStreamDefaultControllerError(this, e);
// }
// [ws.abortSteps_](reason: shared.ErrorResult): Promise<void> {
// const result = this[ws.abortAlgorithm_](reason);
// ws.writableStreamDefaultControllerClearAlgorithms(this);
// return result;
// }
// [ws.errorSteps_](): void {
// q.resetQueue(this);
// }
// }
// export function setUpWritableStreamDefaultControllerFromUnderlyingSink<
// InputType
// >(
// stream: ws.WritableStream<InputType>,
// underlyingSink: ws.WritableStreamSink<InputType>,
// highWaterMark: number,
// sizeAlgorithm: QueuingStrategySizeCallback<InputType>
// ): void {
// // Assert: underlyingSink is not undefined.
// const controller = Object.create(
// WritableStreamDefaultController.prototype
// ) as WritableStreamDefaultController<InputType>;
// const startAlgorithm = function(): any {
// return shared.invokeOrNoop(underlyingSink, "start", [controller]);
// };
// const writeAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
// underlyingSink,
// "write",
// [controller]
// );
// const closeAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
// underlyingSink,
// "close",
// []
// );
// const abortAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
// underlyingSink,
// "abort",
// []
// );
// ws.setUpWritableStreamDefaultController(
// stream,
// controller,
// startAlgorithm,
// writeAlgorithm,
// closeAlgorithm,
// abortAlgorithm,
// highWaterMark,
// sizeAlgorithm
// );
// }

View file

@ -0,0 +1,136 @@
// TODO reenable this code when we enable writableStreams and transport types
// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
// /**
// * streams/writable-stream-default-writer - WritableStreamDefaultWriter class implementation
// * Part of Stardazed
// * (c) 2018-Present by Arthur Langereis - @zenmumbler
// * https://github.com/stardazed/sd-streams
// */
// import * as ws from "./writable-internals.ts";
// import * as shared from "./shared-internals.ts";
// export class WritableStreamDefaultWriter<InputType>
// implements ws.WritableStreamDefaultWriter<InputType> {
// [ws.ownerWritableStream_]: ws.WritableStream<InputType> | undefined;
// [ws.readyPromise_]: shared.ControlledPromise<void>;
// [ws.closedPromise_]: shared.ControlledPromise<void>;
// constructor(stream: ws.WritableStream<InputType>) {
// if (!ws.isWritableStream(stream)) {
// throw new TypeError();
// }
// if (ws.isWritableStreamLocked(stream)) {
// throw new TypeError("Stream is already locked");
// }
// this[ws.ownerWritableStream_] = stream;
// stream[ws.writer_] = this;
// const readyPromise = shared.createControlledPromise<void>();
// const closedPromise = shared.createControlledPromise<void>();
// this[ws.readyPromise_] = readyPromise;
// this[ws.closedPromise_] = closedPromise;
// const state = stream[shared.state_];
// if (state === "writable") {
// if (
// !ws.writableStreamCloseQueuedOrInFlight(stream) &&
// stream[ws.backpressure_]
// ) {
// // OK Set this.[[readyPromise]] to a new promise.
// } else {
// readyPromise.resolve(undefined);
// }
// // OK Set this.[[closedPromise]] to a new promise.
// } else if (state === "erroring") {
// readyPromise.reject(stream[shared.storedError_]);
// readyPromise.promise.catch(() => {});
// // OK Set this.[[closedPromise]] to a new promise.
// } else if (state === "closed") {
// readyPromise.resolve(undefined);
// closedPromise.resolve(undefined);
// } else {
// // Assert: state is "errored".
// const storedError = stream[shared.storedError_];
// readyPromise.reject(storedError);
// readyPromise.promise.catch(() => {});
// closedPromise.reject(storedError);
// closedPromise.promise.catch(() => {});
// }
// }
// abort(reason: shared.ErrorResult): Promise<void> {
// if (!ws.isWritableStreamDefaultWriter(this)) {
// return Promise.reject(new TypeError());
// }
// if (this[ws.ownerWritableStream_] === undefined) {
// return Promise.reject(
// new TypeError("Writer is not connected to a stream")
// );
// }
// return ws.writableStreamDefaultWriterAbort(this, reason);
// }
// close(): Promise<void> {
// if (!ws.isWritableStreamDefaultWriter(this)) {
// return Promise.reject(new TypeError());
// }
// const stream = this[ws.ownerWritableStream_];
// if (stream === undefined) {
// return Promise.reject(
// new TypeError("Writer is not connected to a stream")
// );
// }
// if (ws.writableStreamCloseQueuedOrInFlight(stream)) {
// return Promise.reject(new TypeError());
// }
// return ws.writableStreamDefaultWriterClose(this);
// }
// releaseLock(): void {
// const stream = this[ws.ownerWritableStream_];
// if (stream === undefined) {
// return;
// }
// // Assert: stream.[[writer]] is not undefined.
// ws.writableStreamDefaultWriterRelease(this);
// }
// write(chunk: InputType): Promise<void> {
// if (!ws.isWritableStreamDefaultWriter(this)) {
// return Promise.reject(new TypeError());
// }
// if (this[ws.ownerWritableStream_] === undefined) {
// return Promise.reject(
// new TypeError("Writer is not connected to a stream")
// );
// }
// return ws.writableStreamDefaultWriterWrite(this, chunk);
// }
// get closed(): Promise<void> {
// if (!ws.isWritableStreamDefaultWriter(this)) {
// return Promise.reject(new TypeError());
// }
// return this[ws.closedPromise_].promise;
// }
// get desiredSize(): number | null {
// if (!ws.isWritableStreamDefaultWriter(this)) {
// throw new TypeError();
// }
// if (this[ws.ownerWritableStream_] === undefined) {
// throw new TypeError("Writer is not connected to stream");
// }
// return ws.writableStreamDefaultWriterGetDesiredSize(this);
// }
// get ready(): Promise<void> {
// if (!ws.isWritableStreamDefaultWriter(this)) {
// return Promise.reject(new TypeError());
// }
// return this[ws.readyPromise_].promise;
// }
// }

View file

@ -0,0 +1,118 @@
// TODO reenable this code when we enable writableStreams and transport types
// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
// /**
// * streams/writable-stream - WritableStream class implementation
// * Part of Stardazed
// * (c) 2018-Present by Arthur Langereis - @zenmumbler
// * https://github.com/stardazed/sd-streams
// */
// import * as ws from "./writable-internals.ts";
// import * as shared from "./shared-internals.ts";
// import {
// WritableStreamDefaultController,
// setUpWritableStreamDefaultControllerFromUnderlyingSink
// } from "./writable-stream-default-controller.ts";
// import { WritableStreamDefaultWriter } from "./writable-stream-default-writer.ts";
// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.ts";
// export class WritableStream<InputType> {
// [shared.state_]: ws.WritableStreamState;
// [shared.storedError_]: shared.ErrorResult;
// [ws.backpressure_]: boolean;
// [ws.closeRequest_]: shared.ControlledPromise<void> | undefined;
// [ws.inFlightWriteRequest_]: shared.ControlledPromise<void> | undefined;
// [ws.inFlightCloseRequest_]: shared.ControlledPromise<void> | undefined;
// [ws.pendingAbortRequest_]: ws.AbortRequest | undefined;
// [ws.writableStreamController_]:
// | ws.WritableStreamDefaultController<InputType>
// | undefined;
// [ws.writer_]: ws.WritableStreamDefaultWriter<InputType> | undefined;
// [ws.writeRequests_]: Array<shared.ControlledPromise<void>>;
// constructor(
// sink: ws.WritableStreamSink<InputType> = {},
// strategy: QueuingStrategy<InputType> = {}
// ) {
// ws.initializeWritableStream(this);
// const sizeFunc = strategy.size;
// const stratHWM = strategy.highWaterMark;
// if (sink.type !== undefined) {
// throw new RangeError("The type of an underlying sink must be undefined");
// }
// const sizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(sizeFunc);
// const highWaterMark = shared.validateAndNormalizeHighWaterMark(
// stratHWM === undefined ? 1 : stratHWM
// );
// setUpWritableStreamDefaultControllerFromUnderlyingSink(
// this,
// sink,
// highWaterMark,
// sizeAlgorithm
// );
// }
// get locked(): boolean {
// if (!ws.isWritableStream(this)) {
// throw new TypeError();
// }
// return ws.isWritableStreamLocked(this);
// }
// abort(reason?: shared.ErrorResult): Promise<void> {
// if (!ws.isWritableStream(this)) {
// return Promise.reject(new TypeError());
// }
// if (ws.isWritableStreamLocked(this)) {
// return Promise.reject(new TypeError("Cannot abort a locked stream"));
// }
// return ws.writableStreamAbort(this, reason);
// }
// getWriter(): ws.WritableStreamWriter<InputType> {
// if (!ws.isWritableStream(this)) {
// throw new TypeError();
// }
// return new WritableStreamDefaultWriter(this);
// }
// }
// export function createWritableStream<InputType>(
// startAlgorithm: ws.StartAlgorithm,
// writeAlgorithm: ws.WriteAlgorithm<InputType>,
// closeAlgorithm: ws.CloseAlgorithm,
// abortAlgorithm: ws.AbortAlgorithm,
// highWaterMark?: number,
// sizeAlgorithm?: QueuingStrategySizeCallback<InputType>
// ): WritableStream<InputType> {
// if (highWaterMark === undefined) {
// highWaterMark = 1;
// }
// if (sizeAlgorithm === undefined) {
// sizeAlgorithm = (): number => 1;
// }
// // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
// const stream = Object.create(WritableStream.prototype) as WritableStream<
// InputType
// >;
// ws.initializeWritableStream(stream);
// const controller = Object.create(
// WritableStreamDefaultController.prototype
// ) as WritableStreamDefaultController<InputType>;
// ws.setUpWritableStreamDefaultController(
// stream,
// controller,
// startAlgorithm,
// writeAlgorithm,
// closeAlgorithm,
// abortAlgorithm,
// highWaterMark,
// sizeAlgorithm
// );
// return stream;
// }

View file

@ -58,6 +58,9 @@ pub enum ErrorKind {
Diagnostic = 49, Diagnostic = 49,
JSError = 50, JSError = 50,
TypeError = 51, TypeError = 51,
/** TODO this is a DomException type, and should be moved out of here when possible */
DataCloneError = 52,
} }
// Warning! The values in this enum are duplicated in js/compiler.ts // Warning! The values in this enum are duplicated in js/compiler.ts