1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-14 19:37:59 -05:00
denoland-deno/ext/node/polyfills/_stream.mjs

748 lines
89 KiB
JavaScript
Raw Normal View History

// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// deno-fmt-ignore-file
// deno-lint-ignore-file
import { nextTick } from "ext:deno_node/_next_tick.ts";
import { AbortController } from "ext:deno_web/03_abort_signal.js";
import { Blob } from "ext:deno_web/09_file.js";
/* esm.sh - esbuild bundle(readable-stream@4.2.0) es2022 production */
const __process$ = { nextTick };import __buffer$ from "ext:deno_node/buffer.ts";import __string_decoder$ from "ext:deno_node/string_decoder.ts";import __events$ from "ext:deno_node/events.ts";var pi=Object.create;var Bt=Object.defineProperty;var wi=Object.getOwnPropertyDescriptor;var yi=Object.getOwnPropertyNames;var gi=Object.getPrototypeOf,Si=Object.prototype.hasOwnProperty;var E=(e=>typeof require<"u"?require:typeof Proxy<"u"?new Proxy(e,{get:(t,n)=>(typeof require<"u"?require:t)[n]}):e)(function(e){if(typeof require<"u")return require.apply(this,arguments);throw new Error('Dynamic require of "'+e+'" is not supported')});var g=(e,t)=>()=>(t||e((t={exports:{}}).exports,t),t.exports);var Ei=(e,t,n,r)=>{if(t&&typeof t=="object"||typeof t=="function")for(let i of yi(t))!Si.call(e,i)&&i!==n&&Bt(e,i,{get:()=>t[i],enumerable:!(r=wi(t,i))||r.enumerable});return e};var Ri=(e,t,n)=>(n=e!=null?pi(gi(e)):{},Ei(t||!e||!e.__esModule?Bt(n,"default",{value:e,enumerable:!0}):n,e));var m=g((Yf,Gt)=>{"use strict";Gt.exports={ArrayIsArray(e){return Array.isArray(e)},ArrayPrototypeIncludes(e,t){return e.includes(t)},ArrayPrototypeIndexOf(e,t){return e.indexOf(t)},ArrayPrototypeJoin(e,t){return e.join(t)},ArrayPrototypeMap(e,t){return e.map(t)},ArrayPrototypePop(e,t){return e.pop(t)},ArrayPrototypePush(e,t){return e.push(t)},ArrayPrototypeSlice(e,t,n){return e.slice(t,n)},Error,FunctionPrototypeCall(e,t,...n){return e.call(t,...n)},FunctionPrototypeSymbolHasInstance(e,t){return Function.prototype[Symbol.hasInstance].call(e,t)},MathFloor:Math.floor,Number,NumberIsInteger:Number.isInteger,NumberIsNaN:Number.isNaN,NumberMAX_SAFE_INTEGER:Number.MAX_SAFE_INTEGER,NumberMIN_SAFE_INTEGER:Number.MIN_SAFE_INTEGER,NumberParseInt:Number.parseInt,ObjectDefineProperties(e,t){return Object.defineProperties(e,t)},ObjectDefineProperty(e,t,n){return Object.defineProperty(e,t,n)},ObjectGetOwnPropertyDescriptor(e,t){return Object.getOwnPropertyDescriptor(e,t)},ObjectKeys(e){return Object.keys(e)},ObjectSetPrototypeOf(e,t){return Object.setPrototypeOf(e,t)},Promise,PromisePrototypeCatch(e,t){return e.catch(t)},PromisePrototypeThen(e,t,n){return e.then(t,n)},PromiseReject(e){return Promise.reject(e)},ReflectApply:Reflect.apply,RegExpPrototypeTest(e,t){return e.test(t)},SafeSet:Set,String,StringPrototypeSlice(e,t,n){return e.slice(t,n)},StringPrototypeToLowerCase(e){return e.toLowerCase()},StringPrototypeToUpperCase(e){return e.toUpperCase()},StringPrototypeTrim(e){return e.trim()},Symbol,SymbolAsyncIterator:Symbol.asyncIterator,SymbolHasInstance:Symbol.hasInstance,SymbolIterator:Symbol.iterator,TypedArrayPrototypeSet(e,t,n){return e.set(t,n)},Uint8Array}});var j=g((Kf,Je)=>{"use strict";var Ai=__buffer$,mi=Object.getPrototypeOf(async function(){}).constructor,Ht=Blob||Ai.Blob,Ti=typeof Ht<"u"?function(t){return t instanceof Ht}:function(t){return!1},Xe=class extends Error{constructor(t){if(!Array.isArray(t))throw new TypeError(`Expected input to be an Array, got ${typeof t}`);let n="";for(let r=0;r<t.length;r++)n+=` ${t[r].stack}
`;super(n),this.name="AggregateError",this.errors=t}};Je.exports={AggregateError:Xe,kEmptyObject:Object.freeze({}),once(e){let t=!1;return function(...n){t||(t=!0,e.apply(this,n))}},createDeferredPromise:function(){let e,t;return{promise:new Promise((r,i)=>{e=r,t=i}),resolve:e,reject:t}},promisify(e){return new Promise((t,n)=>{e((r,...i)=>r?n(r):t(...i))})},debuglog(){return function(){}},format(e,...t){return e.replace(/%([sdifj])/g,function(...[n,r]){let i=t.shift();return r==="f"?i.toFixed(6):r==="j"?JSON.stringify(i):r==="s"&&typeof i=="object"?`${i.constructor!==Object?i.constructor.name:""} {}`.trim():i.toString()})},inspect(e){switch(typeof e){case"string":if(e.includes("'"))if(e.includes('"')){if(!e.includes("`")&&!e.includes("${"))return`\`${e}\``}else return`"${e}"`;return`'${e}'`;case"number":return isNaN(e)?"NaN":Object.is(e,-0)?String(e):e;case"bigint":return`${String(e)}n`;case"boolean":case"undefined":return String(e);case"object":return"{}"}},types:{isAsyncFunction(e){return e instanceof mi},isArrayBufferView(e){return ArrayBuffer.isView(e)}},isBlob:Ti};Je.exports.promisify.custom=Symbol.for("nodejs.util.promisify.custom")});var O=g((zf,Kt)=>{"use strict";var{format:Ii,inspect:Re,AggregateError:Mi}=j(),Ni=globalThis.AggregateError||Mi,Di=Symbol("kIsNodeError"),Oi=["string","function","number","object","Function","Object","boolean","bigint","symbol"],qi=/^([A-Z][a-z0-9]*)+$/,xi="__node_internal_",Ae={};function X(e,t){if(!e)throw new Ae.ERR_INTERNAL_ASSERTION(t)}function Vt(e){let t="",n=e.length,r=e[0]==="-"?1:0;for(;n>=r+4;n-=3)t=`_${e.slice(n-3,n)}${t}`;return`${e.slice(0,n)}${t}`}function Li(e,t,n){if(typeof t=="function")return X(t.length<=n.length,`Code: ${e}; The provided arguments length (${n.length}) does not match the required ones (${t.length}).`),t(...n);let r=(t.match(/%[dfijoOs]/g)||[]).length;return X(r===n.length,`Code: ${e}; The provided arguments length (${n.length}) does not match the required ones (${r}).`),n.length===0?t:Ii(t,...n)}function N(e,t,n){n||(n=Error);class r extends n{constructor(...o){super(Li(e,t,o))}toString(){return`${this.name} [${e}]: ${this.message}`}}Object.defineProperties(r.prototype,{name:{value:n.name,writable:!0,enumerable:!1,configurable:!0},toString:{value(){return`${this.name} [${e}]: ${this.message}`},writable:!0,enumerable:!1,configurable:!0}}),r.prototype.code=e,r.prototype[Di]=!0,Ae[e]=r}function Yt(e){let t=xi+e.name;return Object.defineProperty(e,"name",{value:t}),e}function Pi(e,t){if(e&&t&&e!==t){if(Array.isArray(t.errors))return t.errors.push(e),t;let n=new Ni([t,e],t.message);return n.code=t.code,n}return e||t}var Qe=class extends Error{constructor(t="The operation was aborted",n=void 0){if(n!==void 0&&typeof n!="object")throw new Ae.ERR_INVALID_ARG_TYPE("options","Object",n);super(t,n),this.code="ABORT_ERR",this.name="AbortError"}};N("ERR_ASSERTION","%s",Error);N("ERR_INVALID_ARG_TYPE",(e,t,n)=>{X(typeof e=="string","'name' must be a string"),Array.isArray(t)||(t=[t]);let r="The ";e.endsWith(" argument")?r+=`${e} `:r+=`"${e}" ${e.includes(".")?"property":"argument"} `,r+="must be ";let i=[],o=[],l=[];for(let f of t)X(typeof f=="string","All expected entries have to be of type string"),Oi.includes(f)?i.push(f.toLowerCase()):qi.test(f)?o.push(f):(X(f!=="object",'The value "object" should be written as "Object"'),l.push(f));if(o.length>0){let f=i.indexOf("object");f!==-1&&(i.splice(i,f,1),o.push("Object"))}if(i.length>0){switch(i.length){case 1:r+=`of type ${i[0]}`;break;case 2:r+=`one of type ${i[0]} or ${i[1]}`;break;default:{let f=i.pop();r+=`one of type ${i.join(", ")}, or ${f}`}}(o.length>0||l.length>0)&&(r+=" or ")}if(o.length>0){switch(o.length){case 1:r+=`an instance of ${o[0]}`;break;case 2:r+=`an instance of ${o[0]} or ${o[1]}`;break;default:{let f=o.pop();r+=`an instance of ${o.join(", ")}, or ${f}`}}l.length>0&&(r+=" or ")}switch(l.length){case 0:break;case 1:l[0].toLowerCase()!==l[0]&&(r+="an "),r+=`${l[0]}`;break;case 2:r+=`one of ${l[0]} or ${l[1]}`;break;default:{let f=l.pop();r+=`one of ${l.join(", ")}, or ${f}`}}if(n==null)r+=`. Re
/* End esm.sh bundle */
// The following code implements Readable.fromWeb(), Writable.fromWeb(), and
// Duplex.fromWeb(). These functions are not properly implemented in the
// readable-stream module yet. This can be removed once the following upstream
// issue is resolved: https://github.com/nodejs/readable-stream/issues/482
import {
AbortError,
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_STREAM_PREMATURE_CLOSE,
} from "ext:deno_node/internal/errors.ts";
import { destroy } from "ext:deno_node/internal/streams/destroy.mjs";
import finished from "ext:deno_node/internal/streams/end-of-stream.mjs";
import {
isDestroyed,
isReadable,
isReadableEnded,
isWritable,
isWritableEnded,
} from "ext:deno_node/internal/streams/utils.mjs";
import { createDeferredPromise, kEmptyObject } from "ext:deno_node/internal/util.mjs";
import { validateBoolean, validateObject } from "ext:deno_node/internal/validators.mjs";
const process = __process$;
const { Buffer } = __buffer$;
const Readable = Au;
const Writable = mu;
const Duplex = Tu;
function isReadableStream(object) {
return object instanceof ReadableStream;
}
function isWritableStream(object) {
return object instanceof WritableStream;
}
Readable.fromWeb = function (
readableStream,
options = kEmptyObject,
) {
if (!isReadableStream(readableStream)) {
throw new ERR_INVALID_ARG_TYPE(
"readableStream",
"ReadableStream",
readableStream,
);
}
validateObject(options, "options");
const {
highWaterMark,
encoding,
objectMode = false,
signal,
} = options;
if (encoding !== undefined && !Buffer.isEncoding(encoding)) {
throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding");
}
validateBoolean(objectMode, "options.objectMode");
const reader = readableStream.getReader();
let closed = false;
const readable = new Readable({
objectMode,
highWaterMark,
encoding,
signal,
read() {
reader.read().then(
(chunk) => {
if (chunk.done) {
readable.push(null);
} else {
readable.push(chunk.value);
}
},
(error) => destroy.call(readable, error),
);
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => {
throw error;
});
}
}
if (!closed) {
reader.cancel(error).then(done, done);
return;
}
done();
},
});
reader.closed.then(
() => {
closed = true;
if (!isReadableEnded(readable)) {
readable.push(null);
}
},
(error) => {
closed = true;
destroy.call(readable, error);
},
);
return readable;
};
Writable.fromWeb = function (
writableStream,
options = kEmptyObject,
) {
if (!isWritableStream(writableStream)) {
throw new ERR_INVALID_ARG_TYPE(
"writableStream",
"WritableStream",
writableStream,
);
}
validateObject(options, "options");
const {
highWaterMark,
decodeStrings = true,
objectMode = false,
signal,
} = options;
validateBoolean(objectMode, "options.objectMode");
validateBoolean(decodeStrings, "options.decodeStrings");
const writer = writableStream.getWriter();
let closed = false;
const writable = new Writable({
highWaterMark,
objectMode,
decodeStrings,
signal,
writev(chunks, callback) {
function done(error) {
error = error.filter((e) => e);
try {
callback(error.length === 0 ? undefined : error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy.call(writable, error));
}
}
writer.ready.then(
() =>
Promise.all(
chunks.map((data) => writer.write(data.chunk)),
).then(done, done),
done,
);
},
write(chunk, encoding, callback) {
if (typeof chunk === "string" && decodeStrings && !objectMode) {
chunk = Buffer.from(chunk, encoding);
chunk = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength,
);
}
function done(error) {
try {
callback(error);
} catch (error) {
destroy(this, duplex, error);
}
}
writer.ready.then(
() => writer.write(chunk).then(done, done),
done,
);
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => {
throw error;
});
}
}
if (!closed) {
if (error != null) {
writer.abort(error).then(done, done);
} else {
writer.close().then(done, done);
}
return;
}
done();
},
final(callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy.call(writable, error));
}
}
if (!closed) {
writer.close().then(done, done);
}
},
});
writer.closed.then(
() => {
closed = true;
if (!isWritableEnded(writable)) {
destroy.call(writable, new ERR_STREAM_PREMATURE_CLOSE());
}
},
(error) => {
closed = true;
destroy.call(writable, error);
},
);
return writable;
};
Duplex.fromWeb = function (pair, options = kEmptyObject) {
validateObject(pair, "pair");
const {
readable: readableStream,
writable: writableStream,
} = pair;
if (!isReadableStream(readableStream)) {
throw new ERR_INVALID_ARG_TYPE(
"pair.readable",
"ReadableStream",
readableStream,
);
}
if (!isWritableStream(writableStream)) {
throw new ERR_INVALID_ARG_TYPE(
"pair.writable",
"WritableStream",
writableStream,
);
}
validateObject(options, "options");
const {
allowHalfOpen = false,
objectMode = false,
encoding,
decodeStrings = true,
highWaterMark,
signal,
} = options;
validateBoolean(objectMode, "options.objectMode");
if (encoding !== undefined && !Buffer.isEncoding(encoding)) {
throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding");
}
const writer = writableStream.getWriter();
const reader = readableStream.getReader();
let writableClosed = false;
let readableClosed = false;
const duplex = new Duplex({
allowHalfOpen,
highWaterMark,
objectMode,
encoding,
decodeStrings,
signal,
writev(chunks, callback) {
function done(error) {
error = error.filter((e) => e);
try {
callback(error.length === 0 ? undefined : error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(duplex, error));
}
}
writer.ready.then(
() =>
Promise.all(
chunks.map((data) => writer.write(data.chunk)),
).then(done, done),
done,
);
},
write(chunk, encoding, callback) {
if (typeof chunk === "string" && decodeStrings && !objectMode) {
chunk = Buffer.from(chunk, encoding);
chunk = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength,
);
}
function done(error) {
try {
callback(error);
} catch (error) {
destroy(duplex, error);
}
}
writer.ready.then(
() => writer.write(chunk).then(done, done),
done,
);
},
final(callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(duplex, error));
}
}
if (!writableClosed) {
writer.close().then(done, done);
}
},
read() {
reader.read().then(
(chunk) => {
if (chunk.done) {
duplex.push(null);
} else {
duplex.push(chunk.value);
}
},
(error) => destroy(duplex, error),
);
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => {
throw error;
});
}
}
async function closeWriter() {
if (!writableClosed) {
await writer.abort(error);
}
}
async function closeReader() {
if (!readableClosed) {
await reader.cancel(error);
}
}
if (!writableClosed || !readableClosed) {
Promise.all([
closeWriter(),
closeReader(),
]).then(done, done);
return;
}
done();
},
});
writer.closed.then(
() => {
writableClosed = true;
if (!isWritableEnded(duplex)) {
destroy(duplex, new ERR_STREAM_PREMATURE_CLOSE());
}
},
(error) => {
writableClosed = true;
readableClosed = true;
destroy(duplex, error);
},
);
reader.closed.then(
() => {
readableClosed = true;
if (!isReadableEnded(duplex)) {
duplex.push(null);
}
},
(error) => {
writableClosed = true;
readableClosed = true;
destroy(duplex, error);
},
);
return duplex;
};
// readable-stream attaches these to Readable, but Node.js core does not.
// Delete them here to better match Node.js core. These can be removed once
// https://github.com/nodejs/readable-stream/issues/485 is resolved.
delete Readable.Duplex;
delete Readable.PassThrough;
delete Readable.Readable;
delete Readable.Stream;
delete Readable.Transform;
delete Readable.Writable;
delete Readable._isUint8Array;
delete Readable._uint8ArrayToBuffer;
delete Readable.addAbortSignal;
delete Readable.compose;
delete Readable.destroy;
delete Readable.finished;
delete Readable.isDisturbed;
delete Readable.isErrored;
delete Readable.isReadable;
delete Readable.pipeline;
// The following code implements Readable.toWeb(), Writable.toWeb(), and
// Duplex.toWeb(). These functions are not properly implemented in the
// readable-stream module yet. This can be removed once the following upstream
// issue is resolved: https://github.com/nodejs/readable-stream/issues/482
function newReadableStreamFromStreamReadable(
streamReadable,
options = kEmptyObject,
) {
// Not using the internal/streams/utils isReadableNodeStream utility
// here because it will return false if streamReadable is a Duplex
// whose readable option is false. For a Duplex that is not readable,
// we want it to pass this check but return a closed ReadableStream.
if (typeof streamReadable?._readableState !== "object") {
throw new ERR_INVALID_ARG_TYPE(
"streamReadable",
"stream.Readable",
streamReadable,
);
}
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
const readable = new ReadableStream();
readable.cancel();
return readable;
}
const objectMode = streamReadable.readableObjectMode;
const highWaterMark = streamReadable.readableHighWaterMark;
const evaluateStrategyOrFallback = (strategy) => {
// If there is a strategy available, use it
if (strategy) {
return strategy;
}
if (objectMode) {
// When running in objectMode explicitly but no strategy, we just fall
// back to CountQueuingStrategy
return new CountQueuingStrategy({ highWaterMark });
}
// When not running in objectMode explicitly, we just fall
// back to a minimal strategy that just specifies the highWaterMark
// and no size algorithm. Using a ByteLengthQueuingStrategy here
// is unnecessary.
return { highWaterMark };
};
const strategy = evaluateStrategyOrFallback(options?.strategy);
let controller;
function onData(chunk) {
// Copy the Buffer to detach it from the pool.
if (Buffer.isBuffer(chunk) && !objectMode) {
chunk = new Uint8Array(chunk);
}
controller.enqueue(chunk);
if (controller.desiredSize <= 0) {
streamReadable.pause();
}
}
streamReadable.pause();
const cleanup = finished(streamReadable, (error) => {
if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") {
const err = new AbortError(undefined, { cause: error });
error = err;
}
cleanup();
// This is a protection against non-standard, legacy streams
// that happen to emit an error event again after finished is called.
streamReadable.on("error", () => {});
if (error) {
return controller.error(error);
}
controller.close();
});
streamReadable.on("data", onData);
return new ReadableStream({
start(c) {
controller = c;
},
pull() {
streamReadable.resume();
},
cancel(reason) {
destroy(streamReadable, reason);
},
}, strategy);
}
function newWritableStreamFromStreamWritable(streamWritable) {
// Not using the internal/streams/utils isWritableNodeStream utility
// here because it will return false if streamWritable is a Duplex
// whose writable option is false. For a Duplex that is not writable,
// we want it to pass this check but return a closed WritableStream.
if (typeof streamWritable?._writableState !== "object") {
throw new ERR_INVALID_ARG_TYPE(
"streamWritable",
"stream.Writable",
streamWritable,
);
}
if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
const writable = new WritableStream();
writable.close();
return writable;
}
const highWaterMark = streamWritable.writableHighWaterMark;
const strategy = streamWritable.writableObjectMode
? new CountQueuingStrategy({ highWaterMark })
: { highWaterMark };
let controller;
let backpressurePromise;
let closed;
function onDrain() {
if (backpressurePromise !== undefined) {
backpressurePromise.resolve();
}
}
const cleanup = finished(streamWritable, (error) => {
if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") {
const err = new AbortError(undefined, { cause: error });
error = err;
}
cleanup();
// This is a protection against non-standard, legacy streams
// that happen to emit an error event again after finished is called.
streamWritable.on("error", () => {});
if (error != null) {
if (backpressurePromise !== undefined) {
backpressurePromise.reject(error);
}
// If closed is not undefined, the error is happening
// after the WritableStream close has already started.
// We need to reject it here.
if (closed !== undefined) {
closed.reject(error);
closed = undefined;
}
controller.error(error);
controller = undefined;
return;
}
if (closed !== undefined) {
closed.resolve();
closed = undefined;
return;
}
controller.error(new AbortError());
controller = undefined;
});
streamWritable.on("drain", onDrain);
return new WritableStream({
start(c) {
controller = c;
},
async write(chunk) {
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
backpressurePromise = createDeferredPromise();
return backpressurePromise.promise.finally(() => {
backpressurePromise = undefined;
});
}
},
abort(reason) {
destroy(streamWritable, reason);
},
close() {
if (closed === undefined && !isWritableEnded(streamWritable)) {
closed = createDeferredPromise();
streamWritable.end();
return closed.promise;
}
controller = undefined;
return Promise.resolve();
},
}, strategy);
}
function newReadableWritablePairFromDuplex(duplex) {
// Not using the internal/streams/utils isWritableNodeStream and
// isReadableNodestream utilities here because they will return false
// if the duplex was created with writable or readable options set to
// false. Instead, we'll check the readable and writable state after
// and return closed WritableStream or closed ReadableStream as
// necessary.
if (
typeof duplex?._writableState !== "object" ||
typeof duplex?._readableState !== "object"
) {
throw new ERR_INVALID_ARG_TYPE("duplex", "stream.Duplex", duplex);
}
if (isDestroyed(duplex)) {
const writable = new WritableStream();
const readable = new ReadableStream();
writable.close();
readable.cancel();
return { readable, writable };
}
const writable = isWritable(duplex)
? newWritableStreamFromStreamWritable(duplex)
: new WritableStream();
if (!isWritable(duplex)) {
writable.close();
}
const readable = isReadable(duplex)
? newReadableStreamFromStreamReadable(duplex)
: new ReadableStream();
if (!isReadable(duplex)) {
readable.cancel();
}
return { writable, readable };
}
Readable.toWeb = newReadableStreamFromStreamReadable;
Writable.toWeb = newWritableStreamFromStreamWritable;
Duplex.toWeb = newReadableWritablePairFromDuplex;