From 3ee0c36453a2591139b7e35882e04c1e706e9253 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Mon, 5 Jul 2021 12:18:41 +0200 Subject: [PATCH] refactor: introduce primordials for web/streams (#11251) --- core/00_primordials.js | 3 + core/internal.d.ts | 1 + extensions/web/06_streams.js | 191 +++++++++++++++++++---------------- 3 files changed, 109 insertions(+), 86 deletions(-) diff --git a/core/00_primordials.js b/core/00_primordials.js index 63b6730db2..c6132620e4 100644 --- a/core/00_primordials.js +++ b/core/00_primordials.js @@ -170,6 +170,9 @@ // Create copy of isNaN primordials[isNaN.name] = isNaN; + // Create copy of queueMicrotask + primordials["queueMicrotask"] = queueMicrotask; + // Create copies of URI handling functions [ decodeURI, diff --git a/core/internal.d.ts b/core/internal.d.ts index 8e52f4bf1e..e582eb359c 100644 --- a/core/internal.d.ts +++ b/core/internal.d.ts @@ -632,6 +632,7 @@ declare namespace __bootstrap { export const ObjectPrototypeToLocaleString: UncurryThis< typeof Object.prototype.toLocaleString >; + export const queueMicrotask: typeof globalThis.queueMicrotask; export const RangeError: typeof globalThis.RangeError; export const RangeErrorLength: typeof RangeError.length; export const RangeErrorName: typeof RangeError.name; diff --git a/extensions/web/06_streams.js b/extensions/web/06_streams.js index 48585e8be5..388b7b13cf 100644 --- a/extensions/web/06_streams.js +++ b/extensions/web/06_streams.js @@ -9,6 +9,33 @@ ((window) => { const webidl = window.__bootstrap.webidl; + // TODO(lucacasonato): get AbortSignal from __bootstrap. + const { + ArrayPrototypeMap, + ArrayPrototypePush, + ArrayPrototypeShift, + Error, + NumberIsInteger, + NumberIsNaN, + ObjectCreate, + ObjectDefineProperties, + ObjectDefineProperty, + ObjectGetPrototypeOf, + ObjectSetPrototypeOf, + Promise, + PromiseAll, + PromisePrototypeThen, + PromiseReject, + queueMicrotask, + RangeError, + SymbolAsyncIterator, + TypeError, + Uint8Array, + WeakMap, + WeakMapPrototypeGet, + WeakMapPrototypeHas, + WeakMapPrototypeSet, + } = globalThis.__bootstrap.primordials; const { DOMException } = window.__bootstrap.domException; class AssertionError extends Error { @@ -78,29 +105,13 @@ } } - const originalPromise = Promise; - const originalPromiseThen = Promise.prototype.then; - - /** - * @template T - * @template TResult1 - * @template TResult2 - * @param {Promise} promise - * @param {(value: T) => TResult1 | PromiseLike} onFulfilled - * @param {(reason: any) => TResult2 | PromiseLike=} onRejected - * @returns {Promise} - */ - function performPromiseThen(promise, onFulfilled, onRejected) { - return originalPromiseThen.call(promise, onFulfilled, onRejected); - } - /** * @template T * @param {T | PromiseLike} value * @returns {Promise} */ function resolvePromiseWith(value) { - return new originalPromise((resolve) => resolve(value)); + return new Promise((resolve) => resolve(value)); } /** @param {any} e */ @@ -114,7 +125,7 @@ /** @param {Promise} promise */ function setPromiseIsHandledToTrue(promise) { - performPromiseThen(promise, undefined, rethrowAssertionErrorRejection); + PromisePrototypeThen(promise, undefined, rethrowAssertionErrorRejection); } /** @@ -127,7 +138,7 @@ * @returns {Promise} */ function transformPromiseWith(promise, fulfillmentHandler, rejectionHandler) { - return performPromiseThen(promise, fulfillmentHandler, rejectionHandler); + return PromisePrototypeThen(promise, fulfillmentHandler, rejectionHandler); } /** @@ -162,8 +173,8 @@ * @returns {void} */ function uponPromise(promise, onFulfilled, onRejected) { - performPromiseThen( - performPromiseThen(promise, onFulfilled, onRejected), + PromisePrototypeThen( + PromisePrototypeThen(promise, onFulfilled, onRejected), undefined, rethrowAssertionErrorRejection, ); @@ -335,7 +346,7 @@ function dequeueValue(container) { assert(_queue in container && _queueTotalSize in container); assert(container[_queue].length); - const valueWithSize = container[_queue].shift(); + const valueWithSize = ArrayPrototypeShift(container[_queue]); container[_queueTotalSize] -= valueWithSize.size; if (container[_queueTotalSize] < 0) { container[_queueTotalSize] = 0; @@ -358,7 +369,7 @@ if (size === Infinity) { throw RangeError("chunk size is invalid"); } - container[_queue].push({ value, size }); + ArrayPrototypePush(container[_queue], { value, size }); container[_queueTotalSize] += size; } @@ -371,7 +382,7 @@ return defaultHWM; } const highWaterMark = strategy.highWaterMark; - if (Number.isNaN(highWaterMark) || highWaterMark < 0) { + if (NumberIsNaN(highWaterMark) || highWaterMark < 0) { throw RangeError( `Expected highWaterMark to be a positive number or Infinity, got "${highWaterMark}".`, ); @@ -492,7 +503,7 @@ if (typeof v !== "number") { return false; } - if (Number.isNaN(v)) { + if (NumberIsNaN(v)) { return false; } if (v < 0) { @@ -589,7 +600,8 @@ /** @type {Promise} */ const pullPromise = controller[_pullAlgorithm](controller); setPromiseIsHandledToTrue( - pullPromise.then( + PromisePrototypeThen( + pullPromise, () => { controller[_pulling] = false; if (controller[_pullAgain]) { @@ -707,7 +719,7 @@ byteOffset, byteLength, ) { - controller[_queue].push({ buffer, byteOffset, byteLength }); + ArrayPrototypePush(controller[_queue], { buffer, byteOffset, byteLength }); controller[_queueTotalSize] += byteLength; } @@ -787,7 +799,7 @@ function readableStreamAddReadRequest(stream, readRequest) { assert(isReadableStreamDefaultReader(stream[_reader])); assert(stream[_state] === "readable"); - stream[_reader][_readRequests].push(readRequest); + ArrayPrototypePush(stream[_reader][_readRequests], readRequest); } /** @@ -802,12 +814,12 @@ return resolvePromiseWith(undefined); } if (stream[_state] === "errored") { - return Promise.reject(stream[_storedError]); + return PromiseReject(stream[_storedError]); } readableStreamClose(stream); /** @type {Promise} */ const sourceCancelPromise = stream[_controller][_cancelSteps](reason); - return sourceCancelPromise.then(() => undefined); + return PromisePrototypeThen(sourceCancelPromise, () => undefined); } /** @@ -1064,7 +1076,7 @@ const reader = stream[_reader]; assert(reader[_readRequests].length); /** @type {ReadRequest} */ - const readRequest = reader[_readRequests].shift(); + const readRequest = ArrayPrototypeShift(reader[_readRequests]); if (done) { readRequest.closeSteps(); } else { @@ -1138,7 +1150,7 @@ /** @type {Array<() => Promise>} */ const actions = []; if (preventAbort === false) { - actions.push(() => { + ArrayPrototypePush(actions, () => { if (dest[_state] === "writable") { return writableStreamAbort(dest, error); } else { @@ -1147,7 +1159,7 @@ }); } if (preventCancel === false) { - actions.push(() => { + ArrayPrototypePush(actions, () => { if (source[_state] === "readable") { return readableStreamCancel(source, error); } else { @@ -1156,7 +1168,7 @@ }); } shutdownWithAction( - () => Promise.all(actions.map((action) => action())), + () => PromiseAll(ArrayPrototypeMap(actions, (action) => action())), true, error, ); @@ -1166,6 +1178,7 @@ abortAlgorithm(); return promise.promise; } + // TODO(lucacasonato): use the internal API to listen for abort. signal.addEventListener("abort", abortAlgorithm); } @@ -1370,6 +1383,7 @@ readableStreamReaderGenericRelease(reader); if (signal !== undefined) { + // TODO(lucacasonato): use the internal API to remove the listener. signal.removeEventListener("abort", abortAlgorithm); } if (isError) { @@ -1478,6 +1492,8 @@ const value1 = value; const value2 = value; + // TODO(lucacasonato): respect clonedForBranch2. + if (canceled1 === false) { readableStreamDefaultControllerEnqueue( /** @type {ReadableStreamDefaultController} */ (branch1[ @@ -1578,7 +1594,9 @@ ]), r, ); - cancelPromise.resolve(undefined); + if (canceled1 === false || canceled2 === false) { + cancelPromise.resolve(undefined); + } }); return [branch1, branch2]; @@ -1604,7 +1622,7 @@ ) { assert(stream[_controller] === undefined); if (autoAllocateChunkSize !== undefined) { - assert(Number.isInteger(autoAllocateChunkSize)); + assert(NumberIsInteger(autoAllocateChunkSize)); assert(autoAllocateChunkSize >= 0); } controller[_stream] = stream; @@ -1621,7 +1639,8 @@ const startResult = startAlgorithm(); const startPromise = resolvePromiseWith(startResult); setPromiseIsHandledToTrue( - startPromise.then( + PromisePrototypeThen( + startPromise, () => { controller[_started] = true; assert(controller[_pulling] === false); @@ -1877,7 +1896,7 @@ try { transformStreamDefaultControllerEnqueue(controller, chunk); } catch (e) { - return Promise.reject(e); + return PromiseReject(e); } return resolvePromiseWith(undefined); }; @@ -2333,7 +2352,7 @@ assert(stream[_state] === "writable"); /** @type {Deferred} */ const deferred = new Deferred(); - stream[_writeRequests].push(deferred); + ArrayPrototypePush(stream[_writeRequests], deferred); return deferred.promise; } @@ -2344,7 +2363,7 @@ function writableStreamClose(stream) { const state = stream[_state]; if (state === "closed" || state === "errored") { - return Promise.reject( + return PromiseReject( new TypeError("Writable stream is closed or errored."), ); } @@ -2599,7 +2618,7 @@ return resolvePromiseWith(undefined); } if (state === "errored") { - return Promise.reject(stream[_storedError]); + return PromiseReject(stream[_storedError]); } assert(state === "writable" || state === "erroring"); return writableStreamDefaultWriterClose(writer); @@ -2690,21 +2709,21 @@ chunk, ); if (stream !== writer[_stream]) { - return Promise.reject(new TypeError("Writer's stream is unexpected.")); + return PromiseReject(new TypeError("Writer's stream is unexpected.")); } const state = stream[_state]; if (state === "errored") { - return Promise.reject(stream[_storedError]); + return PromiseReject(stream[_storedError]); } if ( writableStreamCloseQueuedOrInFlight(stream) === true || state === "closed" ) { - return Promise.reject( + return PromiseReject( new TypeError("The stream is closing or is closed."), ); } if (state === "erroring") { - return Promise.reject(stream[_storedError]); + return PromiseReject(stream[_storedError]); } assert(state === "writable"); const promise = writableStreamAddWriteRequest(stream); @@ -2899,8 +2918,8 @@ * @returns {IteratorResult} */ function createIteratorResult(value, done) { - const result = Object.create(null); - Object.defineProperties(result, { + const result = ObjectCreate(null); + ObjectDefineProperties(result, { value: { value, writable: true, enumerable: true, configurable: true }, done: { value: done, @@ -2913,18 +2932,18 @@ } /** @type {AsyncIterator} */ - const asyncIteratorPrototype = Object.getPrototypeOf( - Object.getPrototypeOf(async function* () {}).prototype, + const asyncIteratorPrototype = ObjectGetPrototypeOf( + ObjectGetPrototypeOf(async function* () {}).prototype, ); /** @type {AsyncIterator} */ - const readableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ + const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ /** @returns {Promise>} */ next() { /** @type {ReadableStreamDefaultReader} */ const reader = this[_reader]; if (reader[_stream] === undefined) { - return Promise.reject( + return PromiseReject( new TypeError( "Cannot get the next iteration result once the reader has been released.", ), @@ -2995,7 +3014,7 @@ get size() { webidl.assertBranded(this, ByteLengthQueuingStrategy); initializeByteLengthSizeFunction(this[_globalObject]); - return byteSizeFunctionWeakMap.get(this[_globalObject]); + return WeakMapPrototypeGet(byteSizeFunctionWeakMap, this[_globalObject]); } [Symbol.for("Deno.customInspect")](inspect) { @@ -3015,11 +3034,11 @@ const byteSizeFunctionWeakMap = new WeakMap(); function initializeByteLengthSizeFunction(globalObject) { - if (byteSizeFunctionWeakMap.has(globalObject)) { + if (WeakMapPrototypeHas(byteSizeFunctionWeakMap, globalObject)) { return; } const size = (chunk) => chunk.byteLength; - byteSizeFunctionWeakMap.set(globalObject, size); + WeakMapPrototypeSet(byteSizeFunctionWeakMap, globalObject, size); } class CountQueuingStrategy { @@ -3046,7 +3065,7 @@ get size() { webidl.assertBranded(this, CountQueuingStrategy); initializeCountSizeFunction(this[_globalObject]); - return countSizeFunctionWeakMap.get(this[_globalObject]); + return WeakMapPrototypeGet(countSizeFunctionWeakMap, this[_globalObject]); } [Symbol.for("Deno.customInspect")](inspect) { @@ -3067,11 +3086,11 @@ /** @param {typeof globalThis} globalObject */ function initializeCountSizeFunction(globalObject) { - if (countSizeFunctionWeakMap.has(globalObject)) { + if (WeakMapPrototypeHas(countSizeFunctionWeakMap, globalObject)) { return; } const size = () => 1; - countSizeFunctionWeakMap.set(globalObject, size); + WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size); } /** @template R */ @@ -3159,10 +3178,10 @@ reason = webidl.converters.any(reason); } } catch (err) { - return Promise.reject(err); + return PromiseReject(err); } if (isReadableStreamLocked(this)) { - return Promise.reject( + return PromiseReject( new TypeError("Cannot cancel a locked ReadableStream."), ); } @@ -3175,7 +3194,7 @@ * @returns {AsyncIterableIterator} */ getIterator(options = {}) { - return this[Symbol.asyncIterator](options); + return this[SymbolAsyncIterator](options); } /** @@ -3254,16 +3273,16 @@ context: "Argument 2", }); } catch (err) { - return Promise.reject(err); + return PromiseReject(err); } const { preventClose, preventAbort, preventCancel, signal } = options; if (isReadableStreamLocked(this)) { - return Promise.reject( + return PromiseReject( new TypeError("ReadableStream is already locked."), ); } if (isWritableStreamLocked(destination)) { - return Promise.reject( + return PromiseReject( new TypeError("destination WritableStream is already locked."), ); } @@ -3296,7 +3315,7 @@ context: "Argument 1", }); /** @type {AsyncIterableIterator} */ - const iterator = Object.create(readableStreamAsyncIteratorPrototype); + const iterator = ObjectCreate(readableStreamAsyncIteratorPrototype); const reader = acquireReadableStreamDefaultReader(this); iterator[_reader] = reader; iterator[_preventCancel] = options.preventCancel; @@ -3313,9 +3332,9 @@ } // TODO(lucacasonato): should be moved to webidl crate - ReadableStream.prototype[Symbol.asyncIterator] = + ReadableStream.prototype[SymbolAsyncIterator] = ReadableStream.prototype.values; - Object.defineProperty(ReadableStream.prototype, Symbol.asyncIterator, { + ObjectDefineProperty(ReadableStream.prototype, SymbolAsyncIterator, { writable: true, enumerable: false, configurable: true, @@ -3353,10 +3372,10 @@ try { webidl.assertBranded(this, ReadableStreamDefaultReader); } catch (err) { - return Promise.reject(err); + return PromiseReject(err); } if (this[_stream] === undefined) { - return Promise.reject( + return PromiseReject( new TypeError("Reader has no associated stream."), ); } @@ -3396,7 +3415,7 @@ try { webidl.assertBranded(this, ReadableStreamDefaultReader); } catch (err) { - return Promise.reject(err); + return PromiseReject(err); } return this[_closedPromise].promise; } @@ -3412,11 +3431,11 @@ reason = webidl.converters.any(reason); } } catch (err) { - return Promise.reject(err); + return PromiseReject(err); } if (this[_stream] === undefined) { - return Promise.reject( + return PromiseReject( new TypeError("Reader has no associated stream."), ); } @@ -3573,7 +3592,7 @@ assert(readableStreamHasDefaultReader(stream)); if (this[_queueTotalSize] > 0) { assert(readableStreamGetNumReadRequests(stream) === 0); - const entry = this[_queue].shift(); + const entry = ArrayPrototypeShift(this[_queue]); this[_queueTotalSize] -= entry.byteLength; readableByteStreamControllerHandleQueueDrain(this); const view = new Uint8Array( @@ -3978,13 +3997,13 @@ try { webidl.assertBranded(this, WritableStream); } catch (err) { - return Promise.reject(err); + return PromiseReject(err); } if (reason !== undefined) { reason = webidl.converters.any(reason); } if (isWritableStreamLocked(this)) { - return Promise.reject( + return PromiseReject( new TypeError( "The writable stream is locked, therefore cannot be aborted.", ), @@ -3998,17 +4017,17 @@ try { webidl.assertBranded(this, WritableStream); } catch (err) { - return Promise.reject(err); + return PromiseReject(err); } if (isWritableStreamLocked(this)) { - return Promise.reject( + return PromiseReject( new TypeError( "The writable stream is locked, therefore cannot be closed.", ), ); } if (writableStreamCloseQueuedOrInFlight(this) === true) { - return Promise.reject( + return PromiseReject( new TypeError("The writable stream is already closing."), ); } @@ -4062,7 +4081,7 @@ try { webidl.assertBranded(this, WritableStreamDefaultWriter); } catch (err) { - return Promise.reject(err); + return PromiseReject(err); } return this[_closedPromise].promise; } @@ -4083,7 +4102,7 @@ try { webidl.assertBranded(this, WritableStreamDefaultWriter); } catch (err) { - return Promise.reject(err); + return PromiseReject(err); } return this[_readyPromise].promise; } @@ -4096,13 +4115,13 @@ try { webidl.assertBranded(this, WritableStreamDefaultWriter); } catch (err) { - return Promise.reject(err); + return PromiseReject(err); } if (reason !== undefined) { reason = webidl.converters.any(reason); } if (this[_stream] === undefined) { - return Promise.reject( + return PromiseReject( new TypeError("A writable stream is not associated with the writer."), ); } @@ -4114,16 +4133,16 @@ try { webidl.assertBranded(this, WritableStreamDefaultWriter); } catch (err) { - return Promise.reject(err); + return PromiseReject(err); } const stream = this[_stream]; if (stream === undefined) { - return Promise.reject( + return PromiseReject( new TypeError("A writable stream is not associated with the writer."), ); } if (writableStreamCloseQueuedOrInFlight(stream) === true) { - return Promise.reject( + return PromiseReject( new TypeError("The associated stream is already closing."), ); } @@ -4152,10 +4171,10 @@ chunk = webidl.converters.any(chunk); } } catch (err) { - return Promise.reject(err); + return PromiseReject(err); } if (this[_stream] === undefined) { - return Promise.reject( + return PromiseReject( new TypeError("A writable stream is not associate with the writer."), ); }