1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

feat(streams): reject pending reads when releasing reader (#13375)

This commit is contained in:
Leo Kettmeir 2022-01-14 17:34:54 +01:00 committed by GitHub
parent b720af994a
commit 659bbd731c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 205 additions and 101 deletions

View file

@ -279,6 +279,7 @@
const _pullAlgorithm = Symbol("[[pullAlgorithm]]");
const _pulling = Symbol("[[pulling]]");
const _pullSteps = Symbol("[[PullSteps]]");
const _releaseSteps = Symbol("[[ReleaseSteps]]");
const _queue = Symbol("[[queue]]");
const _queueTotalSize = Symbol("[[queueTotalSize]]");
const _readable = Symbol("[[readable]]");
@ -800,12 +801,19 @@
"The BYOB request's buffer has been detached and so cannot be filled with an enqueued chunk",
);
}
readableByteStreamControllerInvalidateBYOBRequest(controller);
firstPendingPullInto.buffer = transferArrayBuffer(
firstPendingPullInto.buffer,
);
if (firstPendingPullInto.readerType === "none") {
readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
controller,
firstPendingPullInto,
);
}
}
readableByteStreamControllerInvalidateBYOBRequest(controller);
if (readableStreamHasDefaultReader(stream)) {
readableByteStreamControllerProcessReadRequestsUsingQueue(controller);
if (readableStreamGetNumReadRequests(stream) === 0) {
assert(controller[_pendingPullIntos].length === 0);
readableByteStreamControllerEnqueueChunkToQueue(
@ -866,6 +874,54 @@
controller[_queueTotalSize] += byteLength;
}
/**
* @param {ReadableByteStreamController} controller
* @param {ArrayBufferLike} buffer
* @param {number} byteOffset
* @param {number} byteLength
* @returns {void}
*/
function readableByteStreamControllerEnqueueClonedChunkToQueue(
controller,
buffer,
byteOffset,
byteLength,
) {
let cloneResult;
try {
cloneResult = buffer.slice(byteOffset, byteOffset + byteLength);
} catch (e) {
readableByteStreamControllerError(controller, e);
}
readableByteStreamControllerEnqueueChunkToQueue(
controller,
cloneResult,
0,
byteLength,
);
}
/**
* @param {ReadableByteStreamController} controller
* @param {PullIntoDescriptor} pullIntoDescriptor
* @returns {void}
*/
function readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
controller,
pullIntoDescriptor,
) {
assert(pullIntoDescriptor.readerType === "none");
if (pullIntoDescriptor.bytesFilled > 0) {
readableByteStreamControllerEnqueueClonedChunkToQueue(
controller,
pullIntoDescriptor.buffer,
pullIntoDescriptor.byteOffset,
pullIntoDescriptor.bytesFilled,
);
}
readableByteStreamControllerShiftPendingPullInto(controller);
}
/**
* @param {ReadableByteStreamController} controller
* @returns {ReadableStreamBYOBRequest | null}
@ -1000,10 +1056,11 @@
readableStreamClose(stream);
const reader = stream[_reader];
if (reader !== undefined && isReadableStreamBYOBReader(reader)) {
for (const readIntoRequest of reader[_readIntoRequests]) {
const readIntoRequests = reader[_readIntoRequests];
reader[_readIntoRequests] = [];
for (const readIntoRequest of readIntoRequests) {
readIntoRequest.closeSteps(undefined);
}
reader[_readIntoRequests] = [];
}
/** @type {Promise<void>} */
const sourceCancelPromise = stream[_controller][_cancelSteps](reason);
@ -1026,10 +1083,10 @@
if (isReadableStreamDefaultReader(reader)) {
/** @type {Array<ReadRequest<R>>} */
const readRequests = reader[_readRequests];
reader[_readRequests] = [];
for (const readRequest of readRequests) {
readRequest.closeSteps();
}
reader[_readRequests] = [];
}
// This promise can be double resolved.
// See: https://github.com/whatwg/streams/issues/1100
@ -1224,6 +1281,29 @@
}
}
/**
* @param {ReadableStreamBYOBReader} reader
*/
function readableStreamBYOBReaderRelease(reader) {
readableStreamReaderGenericRelease(reader);
const e = new TypeError(
"There are pending read requests, so the reader cannot be released.",
);
readableStreamBYOBReaderErrorReadIntoRequests(reader, e);
}
/**
* @param {ReadableStreamBYOBReader} reader
* @param {any} e
*/
function readableStreamDefaultReaderErrorReadRequests(reader, e) {
const readRequests = reader[_readRequests];
reader[_readRequests] = [];
for (const readRequest of readRequests) {
readRequest.errorSteps(e);
}
}
/**
* @param {ReadableByteStreamController} controller
*/
@ -1250,6 +1330,25 @@
}
}
}
/**
* @param {ReadableByteStreamController} controller
*/
function readableByteStreamControllerProcessReadRequestsUsingQueue(
controller,
) {
const reader = controller[_stream][_reader];
assert(isReadableStreamDefaultReader(reader));
while (reader[_readRequests].length !== 0) {
if (controller[_queueTotalSize] === 0) {
return;
}
const readRequest = ArrayPrototypeShift(reader[_readRequests]);
readableByteStreamControllerFillReadRequestFromQueue(
controller,
readRequest,
);
}
}
/**
* @param {ReadableByteStreamController} controller
@ -1401,6 +1500,16 @@
bytesWritten,
pullIntoDescriptor,
);
if (pullIntoDescriptor.readerType === "none") {
readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
controller,
pullIntoDescriptor,
);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
controller,
);
return;
}
if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
return;
}
@ -1410,16 +1519,11 @@
if (remainderSize > 0) {
const end = pullIntoDescriptor.byteOffset +
pullIntoDescriptor.bytesFilled;
// We dont have access to CloneArrayBuffer, so we use .slice(). End is non-inclusive, as the spec says.
const remainder = pullIntoDescriptor.buffer.slice(
end - remainderSize,
end,
);
readableByteStreamControllerEnqueueChunkToQueue(
readableByteStreamControllerEnqueueClonedChunkToQueue(
controller,
remainder,
0,
remainder.byteLength,
pullIntoDescriptor.buffer,
end - remainderSize,
remainderSize,
);
}
pullIntoDescriptor.bytesFilled -= remainderSize;
@ -1484,6 +1588,9 @@
firstDescriptor,
) {
assert(firstDescriptor.bytesFilled === 0);
if (firstDescriptor.readerType === "none") {
readableByteStreamControllerShiftPendingPullInto(controller);
}
const stream = controller[_stream];
if (readableStreamHasBYOBReader(stream)) {
while (readableStreamGetNumReadIntoRequests(stream) > 0) {
@ -1507,6 +1614,7 @@
pullIntoDescriptor,
) {
assert(stream[_state] !== "errored");
assert(pullIntoDescriptor.readerType !== "none");
let done = false;
if (stream[_state] === "closed") {
assert(pullIntoDescriptor.bytesFilled === 0);
@ -1650,6 +1758,27 @@
return ready;
}
/**
* @param {ReadableByteStreamController} controller
* @param {ReadRequest} readRequest
* @returns {void}
*/
function readableByteStreamControllerFillReadRequestFromQueue(
controller,
readRequest,
) {
assert(controller[_queueTotalSize] > 0);
const entry = ArrayPrototypeShift(controller[_queue]);
controller[_queueTotalSize] -= entry.byteLength;
readableByteStreamControllerHandleQueueDrain(controller);
const view = new Uint8Array(
entry.buffer,
entry.byteOffset,
entry.byteLength,
);
readRequest.chunkSteps(view);
}
/**
* @param {ReadableByteStreamController} controller
* @param {number} size
@ -1708,6 +1837,18 @@
}
}
/**
* @template R
* @param {ReadableStreamDefaultReader<R>} reader
*/
function readableStreamDefaultReaderRelease(reader) {
readableStreamReaderGenericRelease(reader);
const e = new TypeError(
"There are pending read requests, so the reader cannot be released.",
);
readableStreamDefaultReaderErrorReadRequests(reader, e);
}
/**
* @template R
* @param {ReadableStream<R>} stream
@ -1727,18 +1868,10 @@
closedPromise.reject(e);
setPromiseIsHandledToTrue(closedPromise.promise);
if (isReadableStreamDefaultReader(reader)) {
/** @type {Array<ReadRequest<R>>} */
const readRequests = reader[_readRequests];
for (const readRequest of readRequests) {
readRequest.errorSteps(e);
}
reader[_readRequests] = [];
readableStreamDefaultReaderErrorReadRequests(reader, e);
} else {
assert(isReadableStreamBYOBReader(reader));
for (const readIntoRequest of reader[_readIntoRequests]) {
readIntoRequest.errorSteps(e);
}
reader[_readIntoRequests] = [];
readableStreamBYOBReaderErrorReadIntoRequests(reader, e);
}
}
@ -2104,7 +2237,7 @@
*/
function finalize(isError, error) {
writableStreamDefaultWriterRelease(writer);
readableStreamReaderGenericRelease(reader);
readableStreamDefaultReaderRelease(reader);
if (signal !== undefined) {
signal[remove](abortAlgorithm);
@ -2154,9 +2287,10 @@
* @param {ReadableStreamGenericReader<R> | ReadableStreamBYOBReader} reader
*/
function readableStreamReaderGenericRelease(reader) {
assert(reader[_stream] !== undefined);
assert(reader[_stream][_reader] === reader);
if (reader[_stream][_state] === "readable") {
const stream = reader[_stream];
assert(stream !== undefined);
assert(stream[_reader] === reader);
if (stream[_state] === "readable") {
reader[_closedPromise].reject(
new TypeError(
"Reader was released and can no longer be used to monitor the stream's closedness.",
@ -2171,10 +2305,23 @@
);
}
setPromiseIsHandledToTrue(reader[_closedPromise].promise);
reader[_stream][_reader] = undefined;
stream[_controller][_releaseSteps]();
stream[_reader] = undefined;
reader[_stream] = undefined;
}
/**
* @param {ReadableStreamBYOBReader} reader
* @param {any} e
*/
function readableStreamBYOBReaderErrorReadIntoRequests(reader, e) {
const readIntoRequests = reader[_readIntoRequests];
reader[_readIntoRequests] = [];
for (const readIntoRequest of readIntoRequests) {
readIntoRequest.errorSteps(e);
}
}
/**
* @template R
* @param {ReadableStream<R>} stream
@ -2381,7 +2528,7 @@
function pullWithDefaultReader() {
if (isReadableStreamBYOBReader(reader)) {
assert(reader[_readIntoRequests].length === 0);
readableStreamReaderGenericRelease(reader);
readableStreamBYOBReaderRelease(reader);
reader = acquireReadableStreamDefaultReader(stream);
forwardReaderError(reader);
}
@ -2446,7 +2593,7 @@
function pullWithBYOBReader(view, forBranch2) {
if (isReadableStreamDefaultReader(reader)) {
assert(reader[_readRequests].length === 0);
readableStreamReaderGenericRelease(reader);
readableStreamDefaultReaderRelease(reader);
reader = acquireReadableStreamBYOBReader(stream);
forwardReaderError(reader);
}
@ -3982,11 +4129,11 @@
promise.resolve(createIteratorResult(chunk, false));
},
closeSteps() {
readableStreamReaderGenericRelease(reader);
readableStreamDefaultReaderRelease(reader);
promise.resolve(createIteratorResult(undefined, true));
},
errorSteps(e) {
readableStreamReaderGenericRelease(reader);
readableStreamDefaultReaderRelease(reader);
promise.reject(e);
},
};
@ -4006,11 +4153,11 @@
assert(reader[_readRequests].length === 0);
if (this[_preventCancel] === false) {
const result = readableStreamReaderGenericCancel(reader, arg);
readableStreamReaderGenericRelease(reader);
readableStreamDefaultReaderRelease(reader);
await result;
return createIteratorResult(arg, true);
}
readableStreamReaderGenericRelease(reader);
readableStreamDefaultReaderRelease(reader);
return createIteratorResult(undefined, true);
},
}, asyncIteratorPrototype);
@ -4417,12 +4564,7 @@
if (this[_stream] === undefined) {
return;
}
if (this[_readRequests].length) {
throw new TypeError(
"There are pending read requests, so the reader cannot be release.",
);
}
readableStreamReaderGenericRelease(this);
readableStreamDefaultReaderRelease(this);
}
get closed() {
@ -4544,12 +4686,7 @@
if (this[_stream] === undefined) {
return;
}
if (this[_readIntoRequests].length !== 0) {
throw new TypeError(
"There are pending read requests, so the reader cannot be released.",
);
}
readableStreamReaderGenericRelease(this);
readableStreamBYOBReaderRelease(this);
}
get closed() {
@ -4794,15 +4931,7 @@
assert(readableStreamHasDefaultReader(stream));
if (this[_queueTotalSize] > 0) {
assert(readableStreamGetNumReadRequests(stream) === 0);
const entry = ArrayPrototypeShift(this[_queue]);
this[_queueTotalSize] -= entry.byteLength;
readableByteStreamControllerHandleQueueDrain(this);
const view = new Uint8Array(
entry.buffer,
entry.byteOffset,
entry.byteLength,
);
readRequest.chunkSteps(view);
readableByteStreamControllerFillReadRequestFromQueue(this, readRequest);
return;
}
const autoAllocateChunkSize = this[_autoAllocateChunkSize];
@ -4830,6 +4959,15 @@
readableStreamAddReadRequest(stream, readRequest);
readableByteStreamControllerCallPullIfNeeded(this);
}
[_releaseSteps]() {
if (this[_pendingPullIntos].length !== 0) {
/** @type {PullIntoDescriptor} */
const firstPendingPullInto = this[_pendingPullIntos][0];
firstPendingPullInto.readerType = "none";
this[_pendingPullIntos] = [firstPendingPullInto];
}
}
}
webidl.configurePrototype(ReadableByteStreamController);
@ -4944,6 +5082,10 @@
readableStreamDefaultControllerCallPullIfNeeded(this);
}
}
[_releaseSteps]() {
return;
}
}
webidl.configurePrototype(ReadableStreamDefaultController);

View file

@ -33,7 +33,7 @@ interface PullIntoDescriptor {
elementSize: number;
// deno-lint-ignore no-explicit-any
viewConstructor: any;
readerType: "default" | "byob";
readerType: "default" | "byob" | "none";
}
interface ReadableByteStreamQueueEntry {

View file

@ -1430,42 +1430,12 @@
"construct-byob-request.any.html": true,
"construct-byob-request.any.worker.html": true,
"general.any.html": [
"ReadableStream with byte source: releaseLock() on ReadableStreamDefaultReader must reject pending read()",
"ReadableStream with byte source: releaseLock() on ReadableStreamBYOBReader must reject pending read()",
"ReadableStream with byte source: Respond to multiple pull() by separate enqueue()",
"pull() resolving should not resolve read()",
"ReadableStream with byte source: enqueue() discards auto-allocated BYOB request",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respond()",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 1 element Uint16Array, respond(1)",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 2 element Uint8Array, respond(3)",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respondWithNewView()",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, enqueue()",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, close(), respond(0)",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, respond()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, enqueue()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, respond()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, enqueue()",
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read(view) on second reader with 1 element Uint16Array, respond(1)",
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read() on second reader, enqueue()"
"ReadableStream with byte source: enqueue() discards auto-allocated BYOB request"
],
"general.any.worker.html": [
"ReadableStream with byte source: releaseLock() on ReadableStreamDefaultReader must reject pending read()",
"ReadableStream with byte source: releaseLock() on ReadableStreamBYOBReader must reject pending read()",
"ReadableStream with byte source: Respond to multiple pull() by separate enqueue()",
"pull() resolving should not resolve read()",
"ReadableStream with byte source: enqueue() discards auto-allocated BYOB request",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respond()",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 1 element Uint16Array, respond(1)",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 2 element Uint8Array, respond(3)",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respondWithNewView()",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, enqueue()",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, close(), respond(0)",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, respond()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, enqueue()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, respond()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, enqueue()",
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read(view) on second reader with 1 element Uint16Array, respond(1)",
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read() on second reader, enqueue()"
"ReadableStream with byte source: enqueue() discards auto-allocated BYOB request"
],
"non-transferable-buffers.any.html": false,
"non-transferable-buffers.any.worker.html": false,
@ -1487,12 +1457,8 @@
"constructor.any.worker.html": true,
"count-queuing-strategy-integration.any.html": true,
"count-queuing-strategy-integration.any.worker.html": true,
"default-reader.any.html": [
"Second reader can read chunks after first reader was released with pending read requests"
],
"default-reader.any.worker.html": [
"Second reader can read chunks after first reader was released with pending read requests"
],
"default-reader.any.html": true,
"default-reader.any.worker.html": true,
"floating-point-total-queue-size.any.html": true,
"floating-point-total-queue-size.any.worker.html": true,
"garbage-collection.any.html": true,
@ -1505,12 +1471,8 @@
"reentrant-strategies.any.worker.html": true,
"tee.any.html": false,
"tee.any.worker.html": false,
"templated.any.html": [
"ReadableStream (empty) reader: releasing the lock should reject all pending read requests"
],
"templated.any.worker.html": [
"ReadableStream (empty) reader: releasing the lock should reject all pending read requests"
]
"templated.any.html": true,
"templated.any.worker.html": true
},
"transform-streams": {
"backpressure.any.html": true,
@ -4189,4 +4151,4 @@
"Pattern: [{\"pathname\":\"*//*\"}] Inputs: [{\"pathname\":\"foo/bar\"}]"
]
}
}
}