mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 16:42:21 -05:00
fix(ext/web/streams): fix ReadableStream asyncIterator (#16276)
This commit is contained in:
parent
fab10ea3e0
commit
49b5ac947f
2 changed files with 84 additions and 38 deletions
|
@ -32,6 +32,7 @@
|
|||
ObjectDefineProperties,
|
||||
ObjectDefineProperty,
|
||||
ObjectGetPrototypeOf,
|
||||
ObjectPrototype,
|
||||
ObjectPrototypeIsPrototypeOf,
|
||||
ObjectSetPrototypeOf,
|
||||
Promise,
|
||||
|
@ -4424,7 +4425,7 @@
|
|||
* @returns {IteratorResult<T>}
|
||||
*/
|
||||
function createIteratorResult(value, done) {
|
||||
const result = ObjectCreate(null);
|
||||
const result = ObjectCreate(ObjectPrototype);
|
||||
ObjectDefineProperties(result, {
|
||||
value: { value, writable: true, enumerable: true, configurable: true },
|
||||
done: {
|
||||
|
@ -4442,57 +4443,99 @@
|
|||
ObjectGetPrototypeOf(async function* () {}).prototype,
|
||||
);
|
||||
|
||||
const _iteratorNext = Symbol("[[iteratorNext]]");
|
||||
const _iteratorFinished = Symbol("[[iteratorFinished]]");
|
||||
|
||||
/** @type {AsyncIterator<unknown>} */
|
||||
const readableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
|
||||
/** @returns {Promise<IteratorResult<unknown>>} */
|
||||
next() {
|
||||
/** @type {ReadableStreamDefaultReader} */
|
||||
const reader = this[_reader];
|
||||
if (reader[_stream] === undefined) {
|
||||
return PromiseReject(
|
||||
new TypeError(
|
||||
"Cannot get the next iteration result once the reader has been released.",
|
||||
),
|
||||
);
|
||||
function nextSteps() {
|
||||
if (reader[_iteratorFinished]) {
|
||||
return PromiseResolve(createIteratorResult(undefined, true));
|
||||
}
|
||||
|
||||
if (reader[_stream] === undefined) {
|
||||
return PromiseReject(
|
||||
new TypeError(
|
||||
"Cannot get the next iteration result once the reader has been released.",
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
/** @type {Deferred<IteratorResult<any>>} */
|
||||
const promise = new Deferred();
|
||||
/** @type {ReadRequest} */
|
||||
const readRequest = {
|
||||
chunkSteps(chunk) {
|
||||
promise.resolve(createIteratorResult(chunk, false));
|
||||
},
|
||||
closeSteps() {
|
||||
readableStreamDefaultReaderRelease(reader);
|
||||
promise.resolve(createIteratorResult(undefined, true));
|
||||
},
|
||||
errorSteps(e) {
|
||||
readableStreamDefaultReaderRelease(reader);
|
||||
promise.reject(e);
|
||||
},
|
||||
};
|
||||
|
||||
readableStreamDefaultReaderRead(reader, readRequest);
|
||||
return PromisePrototypeThen(promise.promise, (result) => {
|
||||
reader[_iteratorNext] = null;
|
||||
if (result.done === true) {
|
||||
reader[_iteratorFinished] = true;
|
||||
return createIteratorResult(undefined, true);
|
||||
}
|
||||
return result;
|
||||
}, (reason) => {
|
||||
reader[_iteratorNext] = null;
|
||||
reader[_iteratorFinished] = true;
|
||||
throw reason;
|
||||
});
|
||||
}
|
||||
/** @type {Deferred<IteratorResult<any>>} */
|
||||
const promise = new Deferred();
|
||||
/** @type {ReadRequest} */
|
||||
const readRequest = {
|
||||
chunkSteps(chunk) {
|
||||
promise.resolve(createIteratorResult(chunk, false));
|
||||
},
|
||||
closeSteps() {
|
||||
readableStreamDefaultReaderRelease(reader);
|
||||
promise.resolve(createIteratorResult(undefined, true));
|
||||
},
|
||||
errorSteps(e) {
|
||||
readableStreamDefaultReaderRelease(reader);
|
||||
promise.reject(e);
|
||||
},
|
||||
};
|
||||
readableStreamDefaultReaderRead(reader, readRequest);
|
||||
return promise.promise;
|
||||
|
||||
reader[_iteratorNext] = reader[_iteratorNext]
|
||||
? PromisePrototypeThen(reader[_iteratorNext], nextSteps, nextSteps)
|
||||
: nextSteps();
|
||||
|
||||
return reader[_iteratorNext];
|
||||
},
|
||||
/**
|
||||
* @param {unknown} arg
|
||||
* @returns {Promise<IteratorResult<unknown>>}
|
||||
*/
|
||||
async return(arg) {
|
||||
return(arg) {
|
||||
/** @type {ReadableStreamDefaultReader} */
|
||||
const reader = this[_reader];
|
||||
if (reader[_stream] === undefined) {
|
||||
return createIteratorResult(undefined, true);
|
||||
}
|
||||
assert(reader[_readRequests].length === 0);
|
||||
if (this[_preventCancel] === false) {
|
||||
const result = readableStreamReaderGenericCancel(reader, arg);
|
||||
const returnSteps = () => {
|
||||
if (reader[_iteratorFinished]) {
|
||||
return PromiseResolve(createIteratorResult(arg, true));
|
||||
}
|
||||
reader[_iteratorFinished] = true;
|
||||
|
||||
if (reader[_stream] === undefined) {
|
||||
return PromiseResolve(createIteratorResult(undefined, true));
|
||||
}
|
||||
assert(reader[_readRequests].length === 0);
|
||||
if (this[_preventCancel] === false) {
|
||||
const result = readableStreamReaderGenericCancel(reader, arg);
|
||||
readableStreamDefaultReaderRelease(reader);
|
||||
return result;
|
||||
}
|
||||
readableStreamDefaultReaderRelease(reader);
|
||||
await result;
|
||||
return createIteratorResult(arg, true);
|
||||
}
|
||||
readableStreamDefaultReaderRelease(reader);
|
||||
return createIteratorResult(undefined, true);
|
||||
return PromiseResolve(createIteratorResult(undefined, true));
|
||||
};
|
||||
|
||||
const returnPromise = reader[_iteratorNext]
|
||||
? PromisePrototypeThen(reader[_iteratorNext], returnSteps, returnSteps)
|
||||
: returnSteps();
|
||||
return PromisePrototypeThen(
|
||||
returnPromise,
|
||||
() => createIteratorResult(arg, true),
|
||||
);
|
||||
},
|
||||
}, asyncIteratorPrototype);
|
||||
|
||||
|
|
|
@ -1311,7 +1311,10 @@
|
|||
"respond-after-enqueue.any.worker.html": true
|
||||
},
|
||||
"readable-streams": {
|
||||
"async-iterator.any.html": false,
|
||||
"async-iterator.any.html": [
|
||||
"next() that succeeds; return()",
|
||||
"next() that succeeds; return() [no awaiting]"
|
||||
],
|
||||
"bad-strategies.any.html": true,
|
||||
"bad-strategies.any.worker.html": true,
|
||||
"bad-underlying-sources.any.html": true,
|
||||
|
|
Loading…
Reference in a new issue