mirror of
https://github.com/denoland/deno.git
synced 2024-11-27 16:10:57 -05:00
747 lines
89 KiB
JavaScript
747 lines
89 KiB
JavaScript
|
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||
|
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
|
||
|
// deno-fmt-ignore-file
|
||
|
// deno-lint-ignore-file
|
||
|
import { nextTick } from "internal:deno_node/polyfills/_next_tick.ts";
|
||
|
import { stdio } from "internal:deno_node/polyfills/_process/stdio.mjs";
|
||
|
|
||
|
/* esm.sh - esbuild bundle(readable-stream@4.2.0) es2022 production */
|
||
|
const __process$ = { nextTick, stdio };import __buffer$ from "internal:deno_node/polyfills/buffer.ts";import __string_decoder$ from "internal:deno_node/polyfills/string_decoder.ts";import __events$ from "internal:deno_node/polyfills/events.ts";var pi=Object.create;var Bt=Object.defineProperty;var wi=Object.getOwnPropertyDescriptor;var yi=Object.getOwnPropertyNames;var gi=Object.getPrototypeOf,Si=Object.prototype.hasOwnProperty;var E=(e=>typeof require<"u"?require:typeof Proxy<"u"?new Proxy(e,{get:(t,n)=>(typeof require<"u"?require:t)[n]}):e)(function(e){if(typeof require<"u")return require.apply(this,arguments);throw new Error('Dynamic require of "'+e+'" is not supported')});var g=(e,t)=>()=>(t||e((t={exports:{}}).exports,t),t.exports);var Ei=(e,t,n,r)=>{if(t&&typeof t=="object"||typeof t=="function")for(let i of yi(t))!Si.call(e,i)&&i!==n&&Bt(e,i,{get:()=>t[i],enumerable:!(r=wi(t,i))||r.enumerable});return e};var Ri=(e,t,n)=>(n=e!=null?pi(gi(e)):{},Ei(t||!e||!e.__esModule?Bt(n,"default",{value:e,enumerable:!0}):n,e));var m=g((Yf,Gt)=>{"use strict";Gt.exports={ArrayIsArray(e){return Array.isArray(e)},ArrayPrototypeIncludes(e,t){return e.includes(t)},ArrayPrototypeIndexOf(e,t){return e.indexOf(t)},ArrayPrototypeJoin(e,t){return e.join(t)},ArrayPrototypeMap(e,t){return e.map(t)},ArrayPrototypePop(e,t){return e.pop(t)},ArrayPrototypePush(e,t){return e.push(t)},ArrayPrototypeSlice(e,t,n){return e.slice(t,n)},Error,FunctionPrototypeCall(e,t,...n){return e.call(t,...n)},FunctionPrototypeSymbolHasInstance(e,t){return Function.prototype[Symbol.hasInstance].call(e,t)},MathFloor:Math.floor,Number,NumberIsInteger:Number.isInteger,NumberIsNaN:Number.isNaN,NumberMAX_SAFE_INTEGER:Number.MAX_SAFE_INTEGER,NumberMIN_SAFE_INTEGER:Number.MIN_SAFE_INTEGER,NumberParseInt:Number.parseInt,ObjectDefineProperties(e,t){return Object.defineProperties(e,t)},ObjectDefineProperty(e,t,n){return Object.defineProperty(e,t,n)},ObjectGetOwnPropertyDescriptor(e,t){return Object.getOwnPropertyDescriptor(e,t)},ObjectKeys(e){return Object.keys(e)},ObjectSetPrototypeOf(e,t){return Object.setPrototypeOf(e,t)},Promise,PromisePrototypeCatch(e,t){return e.catch(t)},PromisePrototypeThen(e,t,n){return e.then(t,n)},PromiseReject(e){return Promise.reject(e)},ReflectApply:Reflect.apply,RegExpPrototypeTest(e,t){return e.test(t)},SafeSet:Set,String,StringPrototypeSlice(e,t,n){return e.slice(t,n)},StringPrototypeToLowerCase(e){return e.toLowerCase()},StringPrototypeToUpperCase(e){return e.toUpperCase()},StringPrototypeTrim(e){return e.trim()},Symbol,SymbolAsyncIterator:Symbol.asyncIterator,SymbolHasInstance:Symbol.hasInstance,SymbolIterator:Symbol.iterator,TypedArrayPrototypeSet(e,t,n){return e.set(t,n)},Uint8Array}});var j=g((Kf,Je)=>{"use strict";var Ai=__buffer$,mi=Object.getPrototypeOf(async function(){}).constructor,Ht=globalThis.Blob||Ai.Blob,Ti=typeof Ht<"u"?function(t){return t instanceof Ht}:function(t){return!1},Xe=class extends Error{constructor(t){if(!Array.isArray(t))throw new TypeError(`Expected input to be an Array, got ${typeof t}`);let n="";for(let r=0;r<t.length;r++)n+=` ${t[r].stack}
|
||
|
`;super(n),this.name="AggregateError",this.errors=t}};Je.exports={AggregateError:Xe,kEmptyObject:Object.freeze({}),once(e){let t=!1;return function(...n){t||(t=!0,e.apply(this,n))}},createDeferredPromise:function(){let e,t;return{promise:new Promise((r,i)=>{e=r,t=i}),resolve:e,reject:t}},promisify(e){return new Promise((t,n)=>{e((r,...i)=>r?n(r):t(...i))})},debuglog(){return function(){}},format(e,...t){return e.replace(/%([sdifj])/g,function(...[n,r]){let i=t.shift();return r==="f"?i.toFixed(6):r==="j"?JSON.stringify(i):r==="s"&&typeof i=="object"?`${i.constructor!==Object?i.constructor.name:""} {}`.trim():i.toString()})},inspect(e){switch(typeof e){case"string":if(e.includes("'"))if(e.includes('"')){if(!e.includes("`")&&!e.includes("${"))return`\`${e}\``}else return`"${e}"`;return`'${e}'`;case"number":return isNaN(e)?"NaN":Object.is(e,-0)?String(e):e;case"bigint":return`${String(e)}n`;case"boolean":case"undefined":return String(e);case"object":return"{}"}},types:{isAsyncFunction(e){return e instanceof mi},isArrayBufferView(e){return ArrayBuffer.isView(e)}},isBlob:Ti};Je.exports.promisify.custom=Symbol.for("nodejs.util.promisify.custom")});var O=g((zf,Kt)=>{"use strict";var{format:Ii,inspect:Re,AggregateError:Mi}=j(),Ni=globalThis.AggregateError||Mi,Di=Symbol("kIsNodeError"),Oi=["string","function","number","object","Function","Object","boolean","bigint","symbol"],qi=/^([A-Z][a-z0-9]*)+$/,xi="__node_internal_",Ae={};function X(e,t){if(!e)throw new Ae.ERR_INTERNAL_ASSERTION(t)}function Vt(e){let t="",n=e.length,r=e[0]==="-"?1:0;for(;n>=r+4;n-=3)t=`_${e.slice(n-3,n)}${t}`;return`${e.slice(0,n)}${t}`}function Li(e,t,n){if(typeof t=="function")return X(t.length<=n.length,`Code: ${e}; The provided arguments length (${n.length}) does not match the required ones (${t.length}).`),t(...n);let r=(t.match(/%[dfijoOs]/g)||[]).length;return X(r===n.length,`Code: ${e}; The provided arguments length (${n.length}) does not match the required ones (${r}).`),n.length===0?t:Ii(t,...n)}function N(e,t,n){n||(n=Error);class r extends n{constructor(...o){super(Li(e,t,o))}toString(){return`${this.name} [${e}]: ${this.message}`}}Object.defineProperties(r.prototype,{name:{value:n.name,writable:!0,enumerable:!1,configurable:!0},toString:{value(){return`${this.name} [${e}]: ${this.message}`},writable:!0,enumerable:!1,configurable:!0}}),r.prototype.code=e,r.prototype[Di]=!0,Ae[e]=r}function Yt(e){let t=xi+e.name;return Object.defineProperty(e,"name",{value:t}),e}function Pi(e,t){if(e&&t&&e!==t){if(Array.isArray(t.errors))return t.errors.push(e),t;let n=new Ni([t,e],t.message);return n.code=t.code,n}return e||t}var Qe=class extends Error{constructor(t="The operation was aborted",n=void 0){if(n!==void 0&&typeof n!="object")throw new Ae.ERR_INVALID_ARG_TYPE("options","Object",n);super(t,n),this.code="ABORT_ERR",this.name="AbortError"}};N("ERR_ASSERTION","%s",Error);N("ERR_INVALID_ARG_TYPE",(e,t,n)=>{X(typeof e=="string","'name' must be a string"),Array.isArray(t)||(t=[t]);let r="The ";e.endsWith(" argument")?r+=`${e} `:r+=`"${e}" ${e.includes(".")?"property":"argument"} `,r+="must be ";let i=[],o=[],l=[];for(let f of t)X(typeof f=="string","All expected entries have to be of type string"),Oi.includes(f)?i.push(f.toLowerCase()):qi.test(f)?o.push(f):(X(f!=="object",'The value "object" should be written as "Object"'),l.push(f));if(o.length>0){let f=i.indexOf("object");f!==-1&&(i.splice(i,f,1),o.push("Object"))}if(i.length>0){switch(i.length){case 1:r+=`of type ${i[0]}`;break;case 2:r+=`one of type ${i[0]} or ${i[1]}`;break;default:{let f=i.pop();r+=`one of type ${i.join(", ")}, or ${f}`}}(o.length>0||l.length>0)&&(r+=" or ")}if(o.length>0){switch(o.length){case 1:r+=`an instance of ${o[0]}`;break;case 2:r+=`an instance of ${o[0]} or ${o[1]}`;break;default:{let f=o.pop();r+=`an instance of ${o.join(", ")}, or ${f}`}}l.length>0&&(r+=" or ")}switch(l.length){case 0:break;case 1:l[0].toLowerCase()!==l[0]&&(r+="an "),r+=`${l[0]}`;break;case 2:r+=`one of ${l[0]} or ${l[1]}`;break;default:{let f=l.pop();r+=`one of ${l.join(", ")}, or ${f}`}}if(n==null)r+=`. Re
|
||
|
/* End esm.sh bundle */
|
||
|
|
||
|
// The following code implements Readable.fromWeb(), Writable.fromWeb(), and
|
||
|
// Duplex.fromWeb(). These functions are not properly implemented in the
|
||
|
// readable-stream module yet. This can be removed once the following upstream
|
||
|
// issue is resolved: https://github.com/nodejs/readable-stream/issues/482
|
||
|
|
||
|
import {
|
||
|
AbortError,
|
||
|
ERR_INVALID_ARG_TYPE,
|
||
|
ERR_INVALID_ARG_VALUE,
|
||
|
ERR_STREAM_PREMATURE_CLOSE,
|
||
|
} from "internal:deno_node/polyfills/internal/errors.ts";
|
||
|
import { destroy } from "internal:deno_node/polyfills/internal/streams/destroy.mjs";
|
||
|
import finished from "internal:deno_node/polyfills/internal/streams/end-of-stream.mjs";
|
||
|
import {
|
||
|
isDestroyed,
|
||
|
isReadable,
|
||
|
isReadableEnded,
|
||
|
isWritable,
|
||
|
isWritableEnded,
|
||
|
} from "internal:deno_node/polyfills/internal/streams/utils.mjs";
|
||
|
import { createDeferredPromise, kEmptyObject } from "internal:deno_node/polyfills/internal/util.mjs";
|
||
|
import { validateBoolean, validateObject } from "internal:deno_node/polyfills/internal/validators.mjs";
|
||
|
|
||
|
const process = __process$;
|
||
|
const { Buffer } = __buffer$;
|
||
|
const Readable = Au;
|
||
|
const Writable = mu;
|
||
|
const Duplex = Tu;
|
||
|
|
||
|
function isReadableStream(object) {
|
||
|
return object instanceof ReadableStream;
|
||
|
}
|
||
|
|
||
|
function isWritableStream(object) {
|
||
|
return object instanceof WritableStream;
|
||
|
}
|
||
|
|
||
|
Readable.fromWeb = function (
|
||
|
readableStream,
|
||
|
options = kEmptyObject,
|
||
|
) {
|
||
|
if (!isReadableStream(readableStream)) {
|
||
|
throw new ERR_INVALID_ARG_TYPE(
|
||
|
"readableStream",
|
||
|
"ReadableStream",
|
||
|
readableStream,
|
||
|
);
|
||
|
}
|
||
|
|
||
|
validateObject(options, "options");
|
||
|
const {
|
||
|
highWaterMark,
|
||
|
encoding,
|
||
|
objectMode = false,
|
||
|
signal,
|
||
|
} = options;
|
||
|
|
||
|
if (encoding !== undefined && !Buffer.isEncoding(encoding)) {
|
||
|
throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding");
|
||
|
}
|
||
|
validateBoolean(objectMode, "options.objectMode");
|
||
|
|
||
|
const reader = readableStream.getReader();
|
||
|
let closed = false;
|
||
|
|
||
|
const readable = new Readable({
|
||
|
objectMode,
|
||
|
highWaterMark,
|
||
|
encoding,
|
||
|
signal,
|
||
|
|
||
|
read() {
|
||
|
reader.read().then(
|
||
|
(chunk) => {
|
||
|
if (chunk.done) {
|
||
|
readable.push(null);
|
||
|
} else {
|
||
|
readable.push(chunk.value);
|
||
|
}
|
||
|
},
|
||
|
(error) => destroy.call(readable, error),
|
||
|
);
|
||
|
},
|
||
|
|
||
|
destroy(error, callback) {
|
||
|
function done() {
|
||
|
try {
|
||
|
callback(error);
|
||
|
} catch (error) {
|
||
|
// In a next tick because this is happening within
|
||
|
// a promise context, and if there are any errors
|
||
|
// thrown we don't want those to cause an unhandled
|
||
|
// rejection. Let's just escape the promise and
|
||
|
// handle it separately.
|
||
|
process.nextTick(() => {
|
||
|
throw error;
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (!closed) {
|
||
|
reader.cancel(error).then(done, done);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
done();
|
||
|
},
|
||
|
});
|
||
|
|
||
|
reader.closed.then(
|
||
|
() => {
|
||
|
closed = true;
|
||
|
if (!isReadableEnded(readable)) {
|
||
|
readable.push(null);
|
||
|
}
|
||
|
},
|
||
|
(error) => {
|
||
|
closed = true;
|
||
|
destroy.call(readable, error);
|
||
|
},
|
||
|
);
|
||
|
|
||
|
return readable;
|
||
|
};
|
||
|
|
||
|
Writable.fromWeb = function (
|
||
|
writableStream,
|
||
|
options = kEmptyObject,
|
||
|
) {
|
||
|
if (!isWritableStream(writableStream)) {
|
||
|
throw new ERR_INVALID_ARG_TYPE(
|
||
|
"writableStream",
|
||
|
"WritableStream",
|
||
|
writableStream,
|
||
|
);
|
||
|
}
|
||
|
|
||
|
validateObject(options, "options");
|
||
|
const {
|
||
|
highWaterMark,
|
||
|
decodeStrings = true,
|
||
|
objectMode = false,
|
||
|
signal,
|
||
|
} = options;
|
||
|
|
||
|
validateBoolean(objectMode, "options.objectMode");
|
||
|
validateBoolean(decodeStrings, "options.decodeStrings");
|
||
|
|
||
|
const writer = writableStream.getWriter();
|
||
|
let closed = false;
|
||
|
|
||
|
const writable = new Writable({
|
||
|
highWaterMark,
|
||
|
objectMode,
|
||
|
decodeStrings,
|
||
|
signal,
|
||
|
|
||
|
writev(chunks, callback) {
|
||
|
function done(error) {
|
||
|
error = error.filter((e) => e);
|
||
|
try {
|
||
|
callback(error.length === 0 ? undefined : error);
|
||
|
} catch (error) {
|
||
|
// In a next tick because this is happening within
|
||
|
// a promise context, and if there are any errors
|
||
|
// thrown we don't want those to cause an unhandled
|
||
|
// rejection. Let's just escape the promise and
|
||
|
// handle it separately.
|
||
|
process.nextTick(() => destroy.call(writable, error));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
writer.ready.then(
|
||
|
() =>
|
||
|
Promise.all(
|
||
|
chunks.map((data) => writer.write(data.chunk)),
|
||
|
).then(done, done),
|
||
|
done,
|
||
|
);
|
||
|
},
|
||
|
|
||
|
write(chunk, encoding, callback) {
|
||
|
if (typeof chunk === "string" && decodeStrings && !objectMode) {
|
||
|
chunk = Buffer.from(chunk, encoding);
|
||
|
chunk = new Uint8Array(
|
||
|
chunk.buffer,
|
||
|
chunk.byteOffset,
|
||
|
chunk.byteLength,
|
||
|
);
|
||
|
}
|
||
|
|
||
|
function done(error) {
|
||
|
try {
|
||
|
callback(error);
|
||
|
} catch (error) {
|
||
|
destroy(this, duplex, error);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
writer.ready.then(
|
||
|
() => writer.write(chunk).then(done, done),
|
||
|
done,
|
||
|
);
|
||
|
},
|
||
|
|
||
|
destroy(error, callback) {
|
||
|
function done() {
|
||
|
try {
|
||
|
callback(error);
|
||
|
} catch (error) {
|
||
|
// In a next tick because this is happening within
|
||
|
// a promise context, and if there are any errors
|
||
|
// thrown we don't want those to cause an unhandled
|
||
|
// rejection. Let's just escape the promise and
|
||
|
// handle it separately.
|
||
|
process.nextTick(() => {
|
||
|
throw error;
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (!closed) {
|
||
|
if (error != null) {
|
||
|
writer.abort(error).then(done, done);
|
||
|
} else {
|
||
|
writer.close().then(done, done);
|
||
|
}
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
done();
|
||
|
},
|
||
|
|
||
|
final(callback) {
|
||
|
function done(error) {
|
||
|
try {
|
||
|
callback(error);
|
||
|
} catch (error) {
|
||
|
// In a next tick because this is happening within
|
||
|
// a promise context, and if there are any errors
|
||
|
// thrown we don't want those to cause an unhandled
|
||
|
// rejection. Let's just escape the promise and
|
||
|
// handle it separately.
|
||
|
process.nextTick(() => destroy.call(writable, error));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (!closed) {
|
||
|
writer.close().then(done, done);
|
||
|
}
|
||
|
},
|
||
|
});
|
||
|
|
||
|
writer.closed.then(
|
||
|
() => {
|
||
|
closed = true;
|
||
|
if (!isWritableEnded(writable)) {
|
||
|
destroy.call(writable, new ERR_STREAM_PREMATURE_CLOSE());
|
||
|
}
|
||
|
},
|
||
|
(error) => {
|
||
|
closed = true;
|
||
|
destroy.call(writable, error);
|
||
|
},
|
||
|
);
|
||
|
|
||
|
return writable;
|
||
|
};
|
||
|
|
||
|
Duplex.fromWeb = function (pair, options = kEmptyObject) {
|
||
|
validateObject(pair, "pair");
|
||
|
const {
|
||
|
readable: readableStream,
|
||
|
writable: writableStream,
|
||
|
} = pair;
|
||
|
|
||
|
if (!isReadableStream(readableStream)) {
|
||
|
throw new ERR_INVALID_ARG_TYPE(
|
||
|
"pair.readable",
|
||
|
"ReadableStream",
|
||
|
readableStream,
|
||
|
);
|
||
|
}
|
||
|
if (!isWritableStream(writableStream)) {
|
||
|
throw new ERR_INVALID_ARG_TYPE(
|
||
|
"pair.writable",
|
||
|
"WritableStream",
|
||
|
writableStream,
|
||
|
);
|
||
|
}
|
||
|
|
||
|
validateObject(options, "options");
|
||
|
const {
|
||
|
allowHalfOpen = false,
|
||
|
objectMode = false,
|
||
|
encoding,
|
||
|
decodeStrings = true,
|
||
|
highWaterMark,
|
||
|
signal,
|
||
|
} = options;
|
||
|
|
||
|
validateBoolean(objectMode, "options.objectMode");
|
||
|
if (encoding !== undefined && !Buffer.isEncoding(encoding)) {
|
||
|
throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding");
|
||
|
}
|
||
|
|
||
|
const writer = writableStream.getWriter();
|
||
|
const reader = readableStream.getReader();
|
||
|
let writableClosed = false;
|
||
|
let readableClosed = false;
|
||
|
|
||
|
const duplex = new Duplex({
|
||
|
allowHalfOpen,
|
||
|
highWaterMark,
|
||
|
objectMode,
|
||
|
encoding,
|
||
|
decodeStrings,
|
||
|
signal,
|
||
|
|
||
|
writev(chunks, callback) {
|
||
|
function done(error) {
|
||
|
error = error.filter((e) => e);
|
||
|
try {
|
||
|
callback(error.length === 0 ? undefined : error);
|
||
|
} catch (error) {
|
||
|
// In a next tick because this is happening within
|
||
|
// a promise context, and if there are any errors
|
||
|
// thrown we don't want those to cause an unhandled
|
||
|
// rejection. Let's just escape the promise and
|
||
|
// handle it separately.
|
||
|
process.nextTick(() => destroy(duplex, error));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
writer.ready.then(
|
||
|
() =>
|
||
|
Promise.all(
|
||
|
chunks.map((data) => writer.write(data.chunk)),
|
||
|
).then(done, done),
|
||
|
done,
|
||
|
);
|
||
|
},
|
||
|
|
||
|
write(chunk, encoding, callback) {
|
||
|
if (typeof chunk === "string" && decodeStrings && !objectMode) {
|
||
|
chunk = Buffer.from(chunk, encoding);
|
||
|
chunk = new Uint8Array(
|
||
|
chunk.buffer,
|
||
|
chunk.byteOffset,
|
||
|
chunk.byteLength,
|
||
|
);
|
||
|
}
|
||
|
|
||
|
function done(error) {
|
||
|
try {
|
||
|
callback(error);
|
||
|
} catch (error) {
|
||
|
destroy(duplex, error);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
writer.ready.then(
|
||
|
() => writer.write(chunk).then(done, done),
|
||
|
done,
|
||
|
);
|
||
|
},
|
||
|
|
||
|
final(callback) {
|
||
|
function done(error) {
|
||
|
try {
|
||
|
callback(error);
|
||
|
} catch (error) {
|
||
|
// In a next tick because this is happening within
|
||
|
// a promise context, and if there are any errors
|
||
|
// thrown we don't want those to cause an unhandled
|
||
|
// rejection. Let's just escape the promise and
|
||
|
// handle it separately.
|
||
|
process.nextTick(() => destroy(duplex, error));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (!writableClosed) {
|
||
|
writer.close().then(done, done);
|
||
|
}
|
||
|
},
|
||
|
|
||
|
read() {
|
||
|
reader.read().then(
|
||
|
(chunk) => {
|
||
|
if (chunk.done) {
|
||
|
duplex.push(null);
|
||
|
} else {
|
||
|
duplex.push(chunk.value);
|
||
|
}
|
||
|
},
|
||
|
(error) => destroy(duplex, error),
|
||
|
);
|
||
|
},
|
||
|
|
||
|
destroy(error, callback) {
|
||
|
function done() {
|
||
|
try {
|
||
|
callback(error);
|
||
|
} catch (error) {
|
||
|
// In a next tick because this is happening within
|
||
|
// a promise context, and if there are any errors
|
||
|
// thrown we don't want those to cause an unhandled
|
||
|
// rejection. Let's just escape the promise and
|
||
|
// handle it separately.
|
||
|
process.nextTick(() => {
|
||
|
throw error;
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
async function closeWriter() {
|
||
|
if (!writableClosed) {
|
||
|
await writer.abort(error);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
async function closeReader() {
|
||
|
if (!readableClosed) {
|
||
|
await reader.cancel(error);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (!writableClosed || !readableClosed) {
|
||
|
Promise.all([
|
||
|
closeWriter(),
|
||
|
closeReader(),
|
||
|
]).then(done, done);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
done();
|
||
|
},
|
||
|
});
|
||
|
|
||
|
writer.closed.then(
|
||
|
() => {
|
||
|
writableClosed = true;
|
||
|
if (!isWritableEnded(duplex)) {
|
||
|
destroy(duplex, new ERR_STREAM_PREMATURE_CLOSE());
|
||
|
}
|
||
|
},
|
||
|
(error) => {
|
||
|
writableClosed = true;
|
||
|
readableClosed = true;
|
||
|
destroy(duplex, error);
|
||
|
},
|
||
|
);
|
||
|
|
||
|
reader.closed.then(
|
||
|
() => {
|
||
|
readableClosed = true;
|
||
|
if (!isReadableEnded(duplex)) {
|
||
|
duplex.push(null);
|
||
|
}
|
||
|
},
|
||
|
(error) => {
|
||
|
writableClosed = true;
|
||
|
readableClosed = true;
|
||
|
destroy(duplex, error);
|
||
|
},
|
||
|
);
|
||
|
|
||
|
return duplex;
|
||
|
};
|
||
|
|
||
|
// readable-stream attaches these to Readable, but Node.js core does not.
|
||
|
// Delete them here to better match Node.js core. These can be removed once
|
||
|
// https://github.com/nodejs/readable-stream/issues/485 is resolved.
|
||
|
delete Readable.Duplex;
|
||
|
delete Readable.PassThrough;
|
||
|
delete Readable.Readable;
|
||
|
delete Readable.Stream;
|
||
|
delete Readable.Transform;
|
||
|
delete Readable.Writable;
|
||
|
delete Readable._isUint8Array;
|
||
|
delete Readable._uint8ArrayToBuffer;
|
||
|
delete Readable.addAbortSignal;
|
||
|
delete Readable.compose;
|
||
|
delete Readable.destroy;
|
||
|
delete Readable.finished;
|
||
|
delete Readable.isDisturbed;
|
||
|
delete Readable.isErrored;
|
||
|
delete Readable.isReadable;
|
||
|
delete Readable.pipeline;
|
||
|
|
||
|
// The following code implements Readable.toWeb(), Writable.toWeb(), and
|
||
|
// Duplex.toWeb(). These functions are not properly implemented in the
|
||
|
// readable-stream module yet. This can be removed once the following upstream
|
||
|
// issue is resolved: https://github.com/nodejs/readable-stream/issues/482
|
||
|
function newReadableStreamFromStreamReadable(
|
||
|
streamReadable,
|
||
|
options = kEmptyObject,
|
||
|
) {
|
||
|
// Not using the internal/streams/utils isReadableNodeStream utility
|
||
|
// here because it will return false if streamReadable is a Duplex
|
||
|
// whose readable option is false. For a Duplex that is not readable,
|
||
|
// we want it to pass this check but return a closed ReadableStream.
|
||
|
if (typeof streamReadable?._readableState !== "object") {
|
||
|
throw new ERR_INVALID_ARG_TYPE(
|
||
|
"streamReadable",
|
||
|
"stream.Readable",
|
||
|
streamReadable,
|
||
|
);
|
||
|
}
|
||
|
|
||
|
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
|
||
|
const readable = new ReadableStream();
|
||
|
readable.cancel();
|
||
|
return readable;
|
||
|
}
|
||
|
|
||
|
const objectMode = streamReadable.readableObjectMode;
|
||
|
const highWaterMark = streamReadable.readableHighWaterMark;
|
||
|
|
||
|
const evaluateStrategyOrFallback = (strategy) => {
|
||
|
// If there is a strategy available, use it
|
||
|
if (strategy) {
|
||
|
return strategy;
|
||
|
}
|
||
|
|
||
|
if (objectMode) {
|
||
|
// When running in objectMode explicitly but no strategy, we just fall
|
||
|
// back to CountQueuingStrategy
|
||
|
return new CountQueuingStrategy({ highWaterMark });
|
||
|
}
|
||
|
|
||
|
// When not running in objectMode explicitly, we just fall
|
||
|
// back to a minimal strategy that just specifies the highWaterMark
|
||
|
// and no size algorithm. Using a ByteLengthQueuingStrategy here
|
||
|
// is unnecessary.
|
||
|
return { highWaterMark };
|
||
|
};
|
||
|
|
||
|
const strategy = evaluateStrategyOrFallback(options?.strategy);
|
||
|
|
||
|
let controller;
|
||
|
|
||
|
function onData(chunk) {
|
||
|
// Copy the Buffer to detach it from the pool.
|
||
|
if (Buffer.isBuffer(chunk) && !objectMode) {
|
||
|
chunk = new Uint8Array(chunk);
|
||
|
}
|
||
|
controller.enqueue(chunk);
|
||
|
if (controller.desiredSize <= 0) {
|
||
|
streamReadable.pause();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
streamReadable.pause();
|
||
|
|
||
|
const cleanup = finished(streamReadable, (error) => {
|
||
|
if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") {
|
||
|
const err = new AbortError(undefined, { cause: error });
|
||
|
error = err;
|
||
|
}
|
||
|
|
||
|
cleanup();
|
||
|
// This is a protection against non-standard, legacy streams
|
||
|
// that happen to emit an error event again after finished is called.
|
||
|
streamReadable.on("error", () => {});
|
||
|
if (error) {
|
||
|
return controller.error(error);
|
||
|
}
|
||
|
controller.close();
|
||
|
});
|
||
|
|
||
|
streamReadable.on("data", onData);
|
||
|
|
||
|
return new ReadableStream({
|
||
|
start(c) {
|
||
|
controller = c;
|
||
|
},
|
||
|
|
||
|
pull() {
|
||
|
streamReadable.resume();
|
||
|
},
|
||
|
|
||
|
cancel(reason) {
|
||
|
destroy(streamReadable, reason);
|
||
|
},
|
||
|
}, strategy);
|
||
|
}
|
||
|
|
||
|
function newWritableStreamFromStreamWritable(streamWritable) {
|
||
|
// Not using the internal/streams/utils isWritableNodeStream utility
|
||
|
// here because it will return false if streamWritable is a Duplex
|
||
|
// whose writable option is false. For a Duplex that is not writable,
|
||
|
// we want it to pass this check but return a closed WritableStream.
|
||
|
if (typeof streamWritable?._writableState !== "object") {
|
||
|
throw new ERR_INVALID_ARG_TYPE(
|
||
|
"streamWritable",
|
||
|
"stream.Writable",
|
||
|
streamWritable,
|
||
|
);
|
||
|
}
|
||
|
|
||
|
if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
|
||
|
const writable = new WritableStream();
|
||
|
writable.close();
|
||
|
return writable;
|
||
|
}
|
||
|
|
||
|
const highWaterMark = streamWritable.writableHighWaterMark;
|
||
|
const strategy = streamWritable.writableObjectMode
|
||
|
? new CountQueuingStrategy({ highWaterMark })
|
||
|
: { highWaterMark };
|
||
|
|
||
|
let controller;
|
||
|
let backpressurePromise;
|
||
|
let closed;
|
||
|
|
||
|
function onDrain() {
|
||
|
if (backpressurePromise !== undefined) {
|
||
|
backpressurePromise.resolve();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
const cleanup = finished(streamWritable, (error) => {
|
||
|
if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") {
|
||
|
const err = new AbortError(undefined, { cause: error });
|
||
|
error = err;
|
||
|
}
|
||
|
|
||
|
cleanup();
|
||
|
// This is a protection against non-standard, legacy streams
|
||
|
// that happen to emit an error event again after finished is called.
|
||
|
streamWritable.on("error", () => {});
|
||
|
if (error != null) {
|
||
|
if (backpressurePromise !== undefined) {
|
||
|
backpressurePromise.reject(error);
|
||
|
}
|
||
|
// If closed is not undefined, the error is happening
|
||
|
// after the WritableStream close has already started.
|
||
|
// We need to reject it here.
|
||
|
if (closed !== undefined) {
|
||
|
closed.reject(error);
|
||
|
closed = undefined;
|
||
|
}
|
||
|
controller.error(error);
|
||
|
controller = undefined;
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (closed !== undefined) {
|
||
|
closed.resolve();
|
||
|
closed = undefined;
|
||
|
return;
|
||
|
}
|
||
|
controller.error(new AbortError());
|
||
|
controller = undefined;
|
||
|
});
|
||
|
|
||
|
streamWritable.on("drain", onDrain);
|
||
|
|
||
|
return new WritableStream({
|
||
|
start(c) {
|
||
|
controller = c;
|
||
|
},
|
||
|
|
||
|
async write(chunk) {
|
||
|
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
|
||
|
backpressurePromise = createDeferredPromise();
|
||
|
return backpressurePromise.promise.finally(() => {
|
||
|
backpressurePromise = undefined;
|
||
|
});
|
||
|
}
|
||
|
},
|
||
|
|
||
|
abort(reason) {
|
||
|
destroy(streamWritable, reason);
|
||
|
},
|
||
|
|
||
|
close() {
|
||
|
if (closed === undefined && !isWritableEnded(streamWritable)) {
|
||
|
closed = createDeferredPromise();
|
||
|
streamWritable.end();
|
||
|
return closed.promise;
|
||
|
}
|
||
|
|
||
|
controller = undefined;
|
||
|
return Promise.resolve();
|
||
|
},
|
||
|
}, strategy);
|
||
|
}
|
||
|
|
||
|
function newReadableWritablePairFromDuplex(duplex) {
|
||
|
// Not using the internal/streams/utils isWritableNodeStream and
|
||
|
// isReadableNodestream utilities here because they will return false
|
||
|
// if the duplex was created with writable or readable options set to
|
||
|
// false. Instead, we'll check the readable and writable state after
|
||
|
// and return closed WritableStream or closed ReadableStream as
|
||
|
// necessary.
|
||
|
if (
|
||
|
typeof duplex?._writableState !== "object" ||
|
||
|
typeof duplex?._readableState !== "object"
|
||
|
) {
|
||
|
throw new ERR_INVALID_ARG_TYPE("duplex", "stream.Duplex", duplex);
|
||
|
}
|
||
|
|
||
|
if (isDestroyed(duplex)) {
|
||
|
const writable = new WritableStream();
|
||
|
const readable = new ReadableStream();
|
||
|
writable.close();
|
||
|
readable.cancel();
|
||
|
return { readable, writable };
|
||
|
}
|
||
|
|
||
|
const writable = isWritable(duplex)
|
||
|
? newWritableStreamFromStreamWritable(duplex)
|
||
|
: new WritableStream();
|
||
|
|
||
|
if (!isWritable(duplex)) {
|
||
|
writable.close();
|
||
|
}
|
||
|
|
||
|
const readable = isReadable(duplex)
|
||
|
? newReadableStreamFromStreamReadable(duplex)
|
||
|
: new ReadableStream();
|
||
|
|
||
|
if (!isReadable(duplex)) {
|
||
|
readable.cancel();
|
||
|
}
|
||
|
|
||
|
return { writable, readable };
|
||
|
}
|
||
|
|
||
|
Readable.toWeb = newReadableStreamFromStreamReadable;
|
||
|
Writable.toWeb = newWritableStreamFromStreamWritable;
|
||
|
Duplex.toWeb = newReadableWritablePairFromDuplex;
|