2023-02-14 11:38:45 -05:00
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// deno-fmt-ignore-file
// deno-lint-ignore-file
refactor(core): include_js_files! 'dir' option doesn't change specifiers (#18019)
This commit changes "include_js_files!" macro from "deno_core"
in a way that "dir" option doesn't cause specifiers to be rewritten
to include it.
Example:
```
include_js_files! {
dir "js",
"hello.js",
}
```
The above definition required embedders to use:
`import ... from "internal:<ext_name>/js/hello.js"`.
But with this change, the "js" directory in which the files are stored
is an implementation detail, which for embedders results in:
`import ... from "internal:<ext_name>/hello.js"`.
The directory the files are stored in, is an implementation detail and
in some cases might result in a significant size difference for the
snapshot. As an example, in "deno_node" extension, we store the
source code in "polyfills" directory; which resulted in each specifier
to look like "internal:deno_node/polyfills/<module_name>", but with
this change it's "internal:deno_node/<module_name>".
Given that "deno_node" has over 100 files, many of them having
several import specifiers to the same extension, this change removes
10 characters from each import specifier.
2023-03-04 21:31:38 -05:00
import { nextTick } from "internal:deno_node/_next_tick.ts" ;
2023-02-22 09:30:58 -05:00
import { AbortController } from "internal:deno_web/03_abort_signal.js" ;
import { Blob } from "internal:deno_web/09_file.js" ;
2023-02-14 11:38:45 -05:00
/* esm.sh - esbuild bundle(readable-stream@4.2.0) es2022 production */
refactor(core): include_js_files! 'dir' option doesn't change specifiers (#18019)
This commit changes "include_js_files!" macro from "deno_core"
in a way that "dir" option doesn't cause specifiers to be rewritten
to include it.
Example:
```
include_js_files! {
dir "js",
"hello.js",
}
```
The above definition required embedders to use:
`import ... from "internal:<ext_name>/js/hello.js"`.
But with this change, the "js" directory in which the files are stored
is an implementation detail, which for embedders results in:
`import ... from "internal:<ext_name>/hello.js"`.
The directory the files are stored in, is an implementation detail and
in some cases might result in a significant size difference for the
snapshot. As an example, in "deno_node" extension, we store the
source code in "polyfills" directory; which resulted in each specifier
to look like "internal:deno_node/polyfills/<module_name>", but with
this change it's "internal:deno_node/<module_name>".
Given that "deno_node" has over 100 files, many of them having
several import specifiers to the same extension, this change removes
10 characters from each import specifier.
2023-03-04 21:31:38 -05:00
const _ _process$ = { nextTick } ; import _ _buffer$ from "internal:deno_node/buffer.ts" ; import _ _string _decoder$ from "internal:deno_node/string_decoder.ts" ; import _ _events$ from "internal:deno_node/events.ts" ; var pi = Object . create ; var Bt = Object . defineProperty ; var wi = Object . getOwnPropertyDescriptor ; var yi = Object . getOwnPropertyNames ; var gi = Object . getPrototypeOf , Si = Object . prototype . hasOwnProperty ; var E = ( e => typeof require < "u" ? require : typeof Proxy < "u" ? new Proxy ( e , { get : ( t , n ) => ( typeof require < "u" ? require : t ) [ n ] } ) : e ) ( function ( e ) { if ( typeof require < "u" ) return require . apply ( this , arguments ) ; throw new Error ( 'Dynamic require of "' + e + '" is not supported' ) } ) ; var g = ( e , t ) => ( ) => ( t || e ( ( t = { exports : { } } ) . exports , t ) , t . exports ) ; var Ei = ( e , t , n , r ) => { if ( t && typeof t == "object" || typeof t == "function" ) for ( let i of yi ( t ) ) ! Si . call ( e , i ) && i !== n && Bt ( e , i , { get : ( ) => t [ i ] , enumerable : ! ( r = wi ( t , i ) ) || r . enumerable } ) ; return e } ; var Ri = ( e , t , n ) => ( n = e != null ? pi ( gi ( e ) ) : { } , Ei ( t || ! e || ! e . _ _esModule ? Bt ( n , "default" , { value : e , enumerable : ! 0 } ) : n , e ) ) ; var m = g ( ( Yf , Gt ) => { "use strict" ; Gt . exports = { ArrayIsArray ( e ) { return Array . isArray ( e ) } , ArrayPrototypeIncludes ( e , t ) { return e . includes ( t ) } , ArrayPrototypeIndexOf ( e , t ) { return e . indexOf ( t ) } , ArrayPrototypeJoin ( e , t ) { return e . join ( t ) } , ArrayPrototypeMap ( e , t ) { return e . map ( t ) } , ArrayPrototypePop ( e , t ) { return e . pop ( t ) } , ArrayPrototypePush ( e , t ) { return e . push ( t ) } , ArrayPrototypeSlice ( e , t , n ) { return e . slice ( t , n ) } , Error , FunctionPrototypeCall ( e , t , ... n ) { return e . call ( t , ... n ) } , FunctionPrototypeSymbolHasInstance ( e , t ) { return Function . prototype [ Symbol . hasInstance ] . call ( e , t ) } , MathFloor : Math . floor , Number , NumberIsInteger : Number . isInteger , NumberIsNaN : Number . isNaN , NumberMAX _SAFE _INTEGER : Number . MAX _SAFE _INTEGER , NumberMIN _SAFE _INTEGER : Number . MIN _SAFE _INTEGER , NumberParseInt : Number . parseInt , ObjectDefineProperties ( e , t ) { return Object . defineProperties ( e , t ) } , ObjectDefineProperty ( e , t , n ) { return Object . defineProperty ( e , t , n ) } , ObjectGetOwnPropertyDescriptor ( e , t ) { return Object . getOwnPropertyDescriptor ( e , t ) } , ObjectKeys ( e ) { return Object . keys ( e ) } , ObjectSetPrototypeOf ( e , t ) { return Object . setPrototypeOf ( e , t ) } , Promise , PromisePrototypeCatch ( e , t ) { return e . catch ( t ) } , PromisePrototypeThen ( e , t , n ) { return e . then ( t , n ) } , PromiseReject ( e ) { return Promise . reject ( e ) } , ReflectApply : Reflect . apply , RegExpPrototypeTest ( e , t ) { return e . test ( t ) } , SafeSet : Set , String , StringPrototypeSlice ( e , t , n ) { return e . slice ( t , n ) } , StringPrototypeToLowerCase ( e ) { return e . toLowerCase ( ) } , StringPrototypeToUpperCase ( e ) { return e . toUpperCase ( ) } , StringPrototypeTrim ( e ) { return e . trim ( ) } , Symbol , SymbolAsyncIterator : Symbol . asyncIterator , SymbolHasInstance : Symbol . hasInstance , SymbolIterator : Symbol . iterator , TypedArrayPrototypeSet ( e , t , n ) { return e . set ( t , n ) } , Uint8Array } } ) ; var j = g ( ( Kf , Je ) => { "use strict" ; var Ai = _ _buffer$ , mi = Object . getPrototypeOf ( async function ( ) { } ) . constructor , Ht = Blob || Ai . Blob , Ti = typeof Ht < "u" ? function ( t ) { return t instanceof Ht } : function ( t ) { return ! 1 } , Xe = class extends Error { constructor ( t ) { if ( ! Array . isArray ( t ) ) throw new TypeError ( ` Expected input to be an Array, got ${ typeof t } ` ) ; let n = "" ; for ( let r = 0 ; r < t . length ; r ++ ) n += ` ${ t [ r ] . stack }
2023-02-22 09:30:58 -05:00
` ;super(n),this.name="AggregateError",this.errors=t}};Je.exports={AggregateError:Xe,kEmptyObject:Object.freeze({}),once(e){let t=!1;return function(...n){t||(t=!0,e.apply(this,n))}},createDeferredPromise:function(){let e,t;return{promise:new Promise((r,i)=>{e=r,t=i}),resolve:e,reject:t}},promisify(e){return new Promise((t,n)=>{e((r,...i)=>r?n(r):t(...i))})},debuglog(){return function(){}},format(e,...t){return e.replace(/%([sdifj])/g,function(...[n,r]){let i=t.shift();return r==="f"?i.toFixed(6):r==="j"?JSON.stringify(i):r==="s"&&typeof i=="object"? ` $ { i . constructor !== Object ? i . constructor . name : "" } { } ` .trim():i.toString()})},inspect(e){switch(typeof e){case"string":if(e.includes("'"))if(e.includes('"')){if(!e.includes(" ` ")&&!e.includes(" $ { "))return`\`${e}\``}else return`" $ { e } "`;return`'${e}'`;case" number ":return isNaN(e)?" NaN ":Object.is(e,-0)?String(e):e;case" bigint ":return`${String(e)}n`;case" boolean ":case" undefined ":return String(e);case" object ":return" { } "}},types:{isAsyncFunction(e){return e instanceof mi},isArrayBufferView(e){return ArrayBuffer.isView(e)}},isBlob:Ti};Je.exports.promisify.custom=Symbol.for(" nodejs . util . promisify . custom ")});var O=g((zf,Kt)=>{" use strict ";var{format:Ii,inspect:Re,AggregateError:Mi}=j(),Ni=globalThis.AggregateError||Mi,Di=Symbol(" kIsNodeError "),Oi=[" string "," function "," number "," object "," Function "," Object "," boolean "," bigint "," symbol "],qi=/^([A-Z][a-z0-9]*)+$/,xi=" _ _node _internal _ ",Ae={};function X(e,t){if(!e)throw new Ae.ERR_INTERNAL_ASSERTION(t)}function Vt(e){let t=" ",n=e.length,r=e[0]===" - "?1:0;for(;n>=r+4;n-=3)t=`_${e.slice(n-3,n)}${t}`;return`${e.slice(0,n)}${t}`}function Li(e,t,n){if(typeof t==" function ")return X(t.length<=n.length,`Code: ${e}; The provided arguments length (${n.length}) does not match the required ones (${t.length}).`),t(...n);let r=(t.match(/%[dfijoOs]/g)||[]).length;return X(r===n.length,`Code: ${e}; The provided arguments length (${n.length}) does not match the required ones (${r}).`),n.length===0?t:Ii(t,...n)}function N(e,t,n){n||(n=Error);class r extends n{constructor(...o){super(Li(e,t,o))}toString(){return`${this.name} [${e}]: ${this.message}`}}Object.defineProperties(r.prototype,{name:{value:n.name,writable:!0,enumerable:!1,configurable:!0},toString:{value(){return`${this.name} [${e}]: ${this.message}`},writable:!0,enumerable:!1,configurable:!0}}),r.prototype.code=e,r.prototype[Di]=!0,Ae[e]=r}function Yt(e){let t=xi+e.name;return Object.defineProperty(e," name ",{value:t}),e}function Pi(e,t){if(e&&t&&e!==t){if(Array.isArray(t.errors))return t.errors.push(e),t;let n=new Ni([t,e],t.message);return n.code=t.code,n}return e||t}var Qe=class extends Error{constructor(t=" The operation was aborted ",n=void 0){if(n!==void 0&&typeof n!=" object ")throw new Ae.ERR_INVALID_ARG_TYPE(" options "," Object ",n);super(t,n),this.code=" ABORT _ERR ",this.name=" AbortError "}};N(" ERR _ASSERTION "," % s ",Error);N(" ERR _INVALID _ARG _TYPE ",(e,t,n)=>{X(typeof e==" string "," 'name' must be a string "),Array.isArray(t)||(t=[t]);let r=" The ";e.endsWith(" argument ")?r+=`${e} `:r+=`" $ { e } " ${e.includes(" . ")?" property ":" argument "} `,r+=" must be ";let i=[],o=[],l=[];for(let f of t)X(typeof f==" string "," All expected entries have to be of type string "),Oi.includes(f)?i.push(f.toLowerCase()):qi.test(f)?o.push(f):(X(f!==" object ",'The value " object " should be written as " Object "'),l.push(f));if(o.length>0){let f=i.indexOf(" object ");f!==-1&&(i.splice(i,f,1),o.push(" Object "))}if(i.length>0){switch(i.length){case 1:r+=`of type ${i[0]}`;break;case 2:r+=`one of type ${i[0]} or ${i[1]}`;break;default:{let f=i.pop();r+=`one of type ${i.join(" , ")}, or ${f}`}}(o.length>0||l.length>0)&&(r+=" or ")}if(o.length>0){switch(o.length){case 1:r+=`an instance of ${o[0]}`;break;case 2:r+=`an instance of ${o[0]} or ${o[1]}`;break;default:{let f=o.pop();r+=`an instance of ${o.join(" , ")}, or ${f}`}}l.length>0&&(r+=" or ")}switch(l.length){case 0:break;case 1:l[0].toLowerCase()!==l[0]&&(r+=" an "),r+=`${l[0]}`;break;case 2:r+=`one of ${l[0]} or ${l[1]}`;break;default:{let f=l.pop();r+=`one of ${l.join(" , " ) } , or $ { f } ` }}if(n==null)r+= ` . Re
2023-02-14 11:38:45 -05:00
/* End esm.sh bundle */
// The following code implements Readable.fromWeb(), Writable.fromWeb(), and
// Duplex.fromWeb(). These functions are not properly implemented in the
// readable-stream module yet. This can be removed once the following upstream
// issue is resolved: https://github.com/nodejs/readable-stream/issues/482
import {
AbortError ,
ERR _INVALID _ARG _TYPE ,
ERR _INVALID _ARG _VALUE ,
ERR _STREAM _PREMATURE _CLOSE ,
refactor(core): include_js_files! 'dir' option doesn't change specifiers (#18019)
This commit changes "include_js_files!" macro from "deno_core"
in a way that "dir" option doesn't cause specifiers to be rewritten
to include it.
Example:
```
include_js_files! {
dir "js",
"hello.js",
}
```
The above definition required embedders to use:
`import ... from "internal:<ext_name>/js/hello.js"`.
But with this change, the "js" directory in which the files are stored
is an implementation detail, which for embedders results in:
`import ... from "internal:<ext_name>/hello.js"`.
The directory the files are stored in, is an implementation detail and
in some cases might result in a significant size difference for the
snapshot. As an example, in "deno_node" extension, we store the
source code in "polyfills" directory; which resulted in each specifier
to look like "internal:deno_node/polyfills/<module_name>", but with
this change it's "internal:deno_node/<module_name>".
Given that "deno_node" has over 100 files, many of them having
several import specifiers to the same extension, this change removes
10 characters from each import specifier.
2023-03-04 21:31:38 -05:00
} from "internal:deno_node/internal/errors.ts" ;
import { destroy } from "internal:deno_node/internal/streams/destroy.mjs" ;
import finished from "internal:deno_node/internal/streams/end-of-stream.mjs" ;
2023-02-14 11:38:45 -05:00
import {
isDestroyed ,
isReadable ,
isReadableEnded ,
isWritable ,
isWritableEnded ,
refactor(core): include_js_files! 'dir' option doesn't change specifiers (#18019)
This commit changes "include_js_files!" macro from "deno_core"
in a way that "dir" option doesn't cause specifiers to be rewritten
to include it.
Example:
```
include_js_files! {
dir "js",
"hello.js",
}
```
The above definition required embedders to use:
`import ... from "internal:<ext_name>/js/hello.js"`.
But with this change, the "js" directory in which the files are stored
is an implementation detail, which for embedders results in:
`import ... from "internal:<ext_name>/hello.js"`.
The directory the files are stored in, is an implementation detail and
in some cases might result in a significant size difference for the
snapshot. As an example, in "deno_node" extension, we store the
source code in "polyfills" directory; which resulted in each specifier
to look like "internal:deno_node/polyfills/<module_name>", but with
this change it's "internal:deno_node/<module_name>".
Given that "deno_node" has over 100 files, many of them having
several import specifiers to the same extension, this change removes
10 characters from each import specifier.
2023-03-04 21:31:38 -05:00
} from "internal:deno_node/internal/streams/utils.mjs" ;
import { createDeferredPromise , kEmptyObject } from "internal:deno_node/internal/util.mjs" ;
import { validateBoolean , validateObject } from "internal:deno_node/internal/validators.mjs" ;
2023-02-14 11:38:45 -05:00
const process = _ _process$ ;
const { Buffer } = _ _buffer$ ;
const Readable = Au ;
const Writable = mu ;
const Duplex = Tu ;
function isReadableStream ( object ) {
return object instanceof ReadableStream ;
}
function isWritableStream ( object ) {
return object instanceof WritableStream ;
}
Readable . fromWeb = function (
readableStream ,
options = kEmptyObject ,
) {
if ( ! isReadableStream ( readableStream ) ) {
throw new ERR _INVALID _ARG _TYPE (
"readableStream" ,
"ReadableStream" ,
readableStream ,
) ;
}
validateObject ( options , "options" ) ;
const {
highWaterMark ,
encoding ,
objectMode = false ,
signal ,
} = options ;
if ( encoding !== undefined && ! Buffer . isEncoding ( encoding ) ) {
throw new ERR _INVALID _ARG _VALUE ( encoding , "options.encoding" ) ;
}
validateBoolean ( objectMode , "options.objectMode" ) ;
const reader = readableStream . getReader ( ) ;
let closed = false ;
const readable = new Readable ( {
objectMode ,
highWaterMark ,
encoding ,
signal ,
read ( ) {
reader . read ( ) . then (
( chunk ) => {
if ( chunk . done ) {
readable . push ( null ) ;
} else {
readable . push ( chunk . value ) ;
}
} ,
( error ) => destroy . call ( readable , error ) ,
) ;
} ,
destroy ( error , callback ) {
function done ( ) {
try {
callback ( error ) ;
} catch ( error ) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process . nextTick ( ( ) => {
throw error ;
} ) ;
}
}
if ( ! closed ) {
reader . cancel ( error ) . then ( done , done ) ;
return ;
}
done ( ) ;
} ,
} ) ;
reader . closed . then (
( ) => {
closed = true ;
if ( ! isReadableEnded ( readable ) ) {
readable . push ( null ) ;
}
} ,
( error ) => {
closed = true ;
destroy . call ( readable , error ) ;
} ,
) ;
return readable ;
} ;
Writable . fromWeb = function (
writableStream ,
options = kEmptyObject ,
) {
if ( ! isWritableStream ( writableStream ) ) {
throw new ERR _INVALID _ARG _TYPE (
"writableStream" ,
"WritableStream" ,
writableStream ,
) ;
}
validateObject ( options , "options" ) ;
const {
highWaterMark ,
decodeStrings = true ,
objectMode = false ,
signal ,
} = options ;
validateBoolean ( objectMode , "options.objectMode" ) ;
validateBoolean ( decodeStrings , "options.decodeStrings" ) ;
const writer = writableStream . getWriter ( ) ;
let closed = false ;
const writable = new Writable ( {
highWaterMark ,
objectMode ,
decodeStrings ,
signal ,
writev ( chunks , callback ) {
function done ( error ) {
error = error . filter ( ( e ) => e ) ;
try {
callback ( error . length === 0 ? undefined : error ) ;
} catch ( error ) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process . nextTick ( ( ) => destroy . call ( writable , error ) ) ;
}
}
writer . ready . then (
( ) =>
Promise . all (
chunks . map ( ( data ) => writer . write ( data . chunk ) ) ,
) . then ( done , done ) ,
done ,
) ;
} ,
write ( chunk , encoding , callback ) {
if ( typeof chunk === "string" && decodeStrings && ! objectMode ) {
chunk = Buffer . from ( chunk , encoding ) ;
chunk = new Uint8Array (
chunk . buffer ,
chunk . byteOffset ,
chunk . byteLength ,
) ;
}
function done ( error ) {
try {
callback ( error ) ;
} catch ( error ) {
destroy ( this , duplex , error ) ;
}
}
writer . ready . then (
( ) => writer . write ( chunk ) . then ( done , done ) ,
done ,
) ;
} ,
destroy ( error , callback ) {
function done ( ) {
try {
callback ( error ) ;
} catch ( error ) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process . nextTick ( ( ) => {
throw error ;
} ) ;
}
}
if ( ! closed ) {
if ( error != null ) {
writer . abort ( error ) . then ( done , done ) ;
} else {
writer . close ( ) . then ( done , done ) ;
}
return ;
}
done ( ) ;
} ,
final ( callback ) {
function done ( error ) {
try {
callback ( error ) ;
} catch ( error ) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process . nextTick ( ( ) => destroy . call ( writable , error ) ) ;
}
}
if ( ! closed ) {
writer . close ( ) . then ( done , done ) ;
}
} ,
} ) ;
writer . closed . then (
( ) => {
closed = true ;
if ( ! isWritableEnded ( writable ) ) {
destroy . call ( writable , new ERR _STREAM _PREMATURE _CLOSE ( ) ) ;
}
} ,
( error ) => {
closed = true ;
destroy . call ( writable , error ) ;
} ,
) ;
return writable ;
} ;
Duplex . fromWeb = function ( pair , options = kEmptyObject ) {
validateObject ( pair , "pair" ) ;
const {
readable : readableStream ,
writable : writableStream ,
} = pair ;
if ( ! isReadableStream ( readableStream ) ) {
throw new ERR _INVALID _ARG _TYPE (
"pair.readable" ,
"ReadableStream" ,
readableStream ,
) ;
}
if ( ! isWritableStream ( writableStream ) ) {
throw new ERR _INVALID _ARG _TYPE (
"pair.writable" ,
"WritableStream" ,
writableStream ,
) ;
}
validateObject ( options , "options" ) ;
const {
allowHalfOpen = false ,
objectMode = false ,
encoding ,
decodeStrings = true ,
highWaterMark ,
signal ,
} = options ;
validateBoolean ( objectMode , "options.objectMode" ) ;
if ( encoding !== undefined && ! Buffer . isEncoding ( encoding ) ) {
throw new ERR _INVALID _ARG _VALUE ( encoding , "options.encoding" ) ;
}
const writer = writableStream . getWriter ( ) ;
const reader = readableStream . getReader ( ) ;
let writableClosed = false ;
let readableClosed = false ;
const duplex = new Duplex ( {
allowHalfOpen ,
highWaterMark ,
objectMode ,
encoding ,
decodeStrings ,
signal ,
writev ( chunks , callback ) {
function done ( error ) {
error = error . filter ( ( e ) => e ) ;
try {
callback ( error . length === 0 ? undefined : error ) ;
} catch ( error ) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process . nextTick ( ( ) => destroy ( duplex , error ) ) ;
}
}
writer . ready . then (
( ) =>
Promise . all (
chunks . map ( ( data ) => writer . write ( data . chunk ) ) ,
) . then ( done , done ) ,
done ,
) ;
} ,
write ( chunk , encoding , callback ) {
if ( typeof chunk === "string" && decodeStrings && ! objectMode ) {
chunk = Buffer . from ( chunk , encoding ) ;
chunk = new Uint8Array (
chunk . buffer ,
chunk . byteOffset ,
chunk . byteLength ,
) ;
}
function done ( error ) {
try {
callback ( error ) ;
} catch ( error ) {
destroy ( duplex , error ) ;
}
}
writer . ready . then (
( ) => writer . write ( chunk ) . then ( done , done ) ,
done ,
) ;
} ,
final ( callback ) {
function done ( error ) {
try {
callback ( error ) ;
} catch ( error ) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process . nextTick ( ( ) => destroy ( duplex , error ) ) ;
}
}
if ( ! writableClosed ) {
writer . close ( ) . then ( done , done ) ;
}
} ,
read ( ) {
reader . read ( ) . then (
( chunk ) => {
if ( chunk . done ) {
duplex . push ( null ) ;
} else {
duplex . push ( chunk . value ) ;
}
} ,
( error ) => destroy ( duplex , error ) ,
) ;
} ,
destroy ( error , callback ) {
function done ( ) {
try {
callback ( error ) ;
} catch ( error ) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process . nextTick ( ( ) => {
throw error ;
} ) ;
}
}
async function closeWriter ( ) {
if ( ! writableClosed ) {
await writer . abort ( error ) ;
}
}
async function closeReader ( ) {
if ( ! readableClosed ) {
await reader . cancel ( error ) ;
}
}
if ( ! writableClosed || ! readableClosed ) {
Promise . all ( [
closeWriter ( ) ,
closeReader ( ) ,
] ) . then ( done , done ) ;
return ;
}
done ( ) ;
} ,
} ) ;
writer . closed . then (
( ) => {
writableClosed = true ;
if ( ! isWritableEnded ( duplex ) ) {
destroy ( duplex , new ERR _STREAM _PREMATURE _CLOSE ( ) ) ;
}
} ,
( error ) => {
writableClosed = true ;
readableClosed = true ;
destroy ( duplex , error ) ;
} ,
) ;
reader . closed . then (
( ) => {
readableClosed = true ;
if ( ! isReadableEnded ( duplex ) ) {
duplex . push ( null ) ;
}
} ,
( error ) => {
writableClosed = true ;
readableClosed = true ;
destroy ( duplex , error ) ;
} ,
) ;
return duplex ;
} ;
// readable-stream attaches these to Readable, but Node.js core does not.
// Delete them here to better match Node.js core. These can be removed once
// https://github.com/nodejs/readable-stream/issues/485 is resolved.
delete Readable . Duplex ;
delete Readable . PassThrough ;
delete Readable . Readable ;
delete Readable . Stream ;
delete Readable . Transform ;
delete Readable . Writable ;
delete Readable . _isUint8Array ;
delete Readable . _uint8ArrayToBuffer ;
delete Readable . addAbortSignal ;
delete Readable . compose ;
delete Readable . destroy ;
delete Readable . finished ;
delete Readable . isDisturbed ;
delete Readable . isErrored ;
delete Readable . isReadable ;
delete Readable . pipeline ;
// The following code implements Readable.toWeb(), Writable.toWeb(), and
// Duplex.toWeb(). These functions are not properly implemented in the
// readable-stream module yet. This can be removed once the following upstream
// issue is resolved: https://github.com/nodejs/readable-stream/issues/482
function newReadableStreamFromStreamReadable (
streamReadable ,
options = kEmptyObject ,
) {
// Not using the internal/streams/utils isReadableNodeStream utility
// here because it will return false if streamReadable is a Duplex
// whose readable option is false. For a Duplex that is not readable,
// we want it to pass this check but return a closed ReadableStream.
if ( typeof streamReadable ? . _readableState !== "object" ) {
throw new ERR _INVALID _ARG _TYPE (
"streamReadable" ,
"stream.Readable" ,
streamReadable ,
) ;
}
if ( isDestroyed ( streamReadable ) || ! isReadable ( streamReadable ) ) {
const readable = new ReadableStream ( ) ;
readable . cancel ( ) ;
return readable ;
}
const objectMode = streamReadable . readableObjectMode ;
const highWaterMark = streamReadable . readableHighWaterMark ;
const evaluateStrategyOrFallback = ( strategy ) => {
// If there is a strategy available, use it
if ( strategy ) {
return strategy ;
}
if ( objectMode ) {
// When running in objectMode explicitly but no strategy, we just fall
// back to CountQueuingStrategy
return new CountQueuingStrategy ( { highWaterMark } ) ;
}
// When not running in objectMode explicitly, we just fall
// back to a minimal strategy that just specifies the highWaterMark
// and no size algorithm. Using a ByteLengthQueuingStrategy here
// is unnecessary.
return { highWaterMark } ;
} ;
const strategy = evaluateStrategyOrFallback ( options ? . strategy ) ;
let controller ;
function onData ( chunk ) {
// Copy the Buffer to detach it from the pool.
if ( Buffer . isBuffer ( chunk ) && ! objectMode ) {
chunk = new Uint8Array ( chunk ) ;
}
controller . enqueue ( chunk ) ;
if ( controller . desiredSize <= 0 ) {
streamReadable . pause ( ) ;
}
}
streamReadable . pause ( ) ;
const cleanup = finished ( streamReadable , ( error ) => {
if ( error ? . code === "ERR_STREAM_PREMATURE_CLOSE" ) {
const err = new AbortError ( undefined , { cause : error } ) ;
error = err ;
}
cleanup ( ) ;
// This is a protection against non-standard, legacy streams
// that happen to emit an error event again after finished is called.
streamReadable . on ( "error" , ( ) => { } ) ;
if ( error ) {
return controller . error ( error ) ;
}
controller . close ( ) ;
} ) ;
streamReadable . on ( "data" , onData ) ;
return new ReadableStream ( {
start ( c ) {
controller = c ;
} ,
pull ( ) {
streamReadable . resume ( ) ;
} ,
cancel ( reason ) {
destroy ( streamReadable , reason ) ;
} ,
} , strategy ) ;
}
function newWritableStreamFromStreamWritable ( streamWritable ) {
// Not using the internal/streams/utils isWritableNodeStream utility
// here because it will return false if streamWritable is a Duplex
// whose writable option is false. For a Duplex that is not writable,
// we want it to pass this check but return a closed WritableStream.
if ( typeof streamWritable ? . _writableState !== "object" ) {
throw new ERR _INVALID _ARG _TYPE (
"streamWritable" ,
"stream.Writable" ,
streamWritable ,
) ;
}
if ( isDestroyed ( streamWritable ) || ! isWritable ( streamWritable ) ) {
const writable = new WritableStream ( ) ;
writable . close ( ) ;
return writable ;
}
const highWaterMark = streamWritable . writableHighWaterMark ;
const strategy = streamWritable . writableObjectMode
? new CountQueuingStrategy ( { highWaterMark } )
: { highWaterMark } ;
let controller ;
let backpressurePromise ;
let closed ;
function onDrain ( ) {
if ( backpressurePromise !== undefined ) {
backpressurePromise . resolve ( ) ;
}
}
const cleanup = finished ( streamWritable , ( error ) => {
if ( error ? . code === "ERR_STREAM_PREMATURE_CLOSE" ) {
const err = new AbortError ( undefined , { cause : error } ) ;
error = err ;
}
cleanup ( ) ;
// This is a protection against non-standard, legacy streams
// that happen to emit an error event again after finished is called.
streamWritable . on ( "error" , ( ) => { } ) ;
if ( error != null ) {
if ( backpressurePromise !== undefined ) {
backpressurePromise . reject ( error ) ;
}
// If closed is not undefined, the error is happening
// after the WritableStream close has already started.
// We need to reject it here.
if ( closed !== undefined ) {
closed . reject ( error ) ;
closed = undefined ;
}
controller . error ( error ) ;
controller = undefined ;
return ;
}
if ( closed !== undefined ) {
closed . resolve ( ) ;
closed = undefined ;
return ;
}
controller . error ( new AbortError ( ) ) ;
controller = undefined ;
} ) ;
streamWritable . on ( "drain" , onDrain ) ;
return new WritableStream ( {
start ( c ) {
controller = c ;
} ,
async write ( chunk ) {
if ( streamWritable . writableNeedDrain || ! streamWritable . write ( chunk ) ) {
backpressurePromise = createDeferredPromise ( ) ;
return backpressurePromise . promise . finally ( ( ) => {
backpressurePromise = undefined ;
} ) ;
}
} ,
abort ( reason ) {
destroy ( streamWritable , reason ) ;
} ,
close ( ) {
if ( closed === undefined && ! isWritableEnded ( streamWritable ) ) {
closed = createDeferredPromise ( ) ;
streamWritable . end ( ) ;
return closed . promise ;
}
controller = undefined ;
return Promise . resolve ( ) ;
} ,
} , strategy ) ;
}
function newReadableWritablePairFromDuplex ( duplex ) {
// Not using the internal/streams/utils isWritableNodeStream and
// isReadableNodestream utilities here because they will return false
// if the duplex was created with writable or readable options set to
// false. Instead, we'll check the readable and writable state after
// and return closed WritableStream or closed ReadableStream as
// necessary.
if (
typeof duplex ? . _writableState !== "object" ||
typeof duplex ? . _readableState !== "object"
) {
throw new ERR _INVALID _ARG _TYPE ( "duplex" , "stream.Duplex" , duplex ) ;
}
if ( isDestroyed ( duplex ) ) {
const writable = new WritableStream ( ) ;
const readable = new ReadableStream ( ) ;
writable . close ( ) ;
readable . cancel ( ) ;
return { readable , writable } ;
}
const writable = isWritable ( duplex )
? newWritableStreamFromStreamWritable ( duplex )
: new WritableStream ( ) ;
if ( ! isWritable ( duplex ) ) {
writable . close ( ) ;
}
const readable = isReadable ( duplex )
? newReadableStreamFromStreamReadable ( duplex )
: new ReadableStream ( ) ;
if ( ! isReadable ( duplex ) ) {
readable . cancel ( ) ;
}
return { writable , readable } ;
}
Readable . toWeb = newReadableStreamFromStreamReadable ;
Writable . toWeb = newWritableStreamFromStreamWritable ;
Duplex . toWeb = newReadableWritablePairFromDuplex ;