// Copyright Node.js contributors. All rights reserved. MIT License. import { once } from "../_utils.ts"; import { destroyer as implDestroyer } from "./destroy.ts"; import eos from "./end_of_stream.ts"; import createReadableStreamAsyncIterator from "./async_iterator.ts"; import * as events from "../events.ts"; import PassThrough from "./passthrough.ts"; import { ERR_INVALID_ARG_TYPE, ERR_INVALID_CALLBACK, ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED, NodeErrorAbstraction, } from "../_errors.ts"; import type Duplex from "./duplex.ts"; import type Readable from "./readable.ts"; import type Stream from "./stream.ts"; import type Transform from "./transform.ts"; import type Writable from "./writable.ts"; type Streams = Duplex | Readable | Writable; // deno-lint-ignore no-explicit-any type EndCallback = (err?: NodeErrorAbstraction | null, val?: any) => void; type TransformCallback = // deno-lint-ignore no-explicit-any | ((value?: any) => AsyncGenerator) // deno-lint-ignore no-explicit-any | ((value?: any) => Promise); /** * This type represents an array that contains a data source, * many Transform Streams, a writable stream destination * and end in an optional callback * */ type DataSource = // deno-lint-ignore no-explicit-any | (() => AsyncGenerator) | // deno-lint-ignore no-explicit-any AsyncIterable | Duplex | // deno-lint-ignore no-explicit-any Iterable | // deno-lint-ignore no-explicit-any (() => Generator) | Readable; type Transformers = Duplex | Transform | TransformCallback | Writable; export type PipelineArguments = [ DataSource, ...Array, ]; function destroyer( stream: Streams, reading: boolean, writing: boolean, callback: EndCallback, ) { callback = once(callback); let finished = false; stream.on("close", () => { finished = true; }); eos(stream, { readable: reading, writable: writing }, (err) => { finished = !err; // deno-lint-ignore no-explicit-any const rState = (stream as any)?._readableState; if ( err && err.code === "ERR_STREAM_PREMATURE_CLOSE" && reading && (rState?.ended && !rState?.errored && !rState?.errorEmitted) ) { stream .once("end", callback) .once("error", callback); } else { callback(err); } }); return (err: NodeErrorAbstraction) => { if (finished) return; finished = true; implDestroyer(stream, err); callback(err || new ERR_STREAM_DESTROYED("pipe")); }; } function popCallback(streams: PipelineArguments): EndCallback { if (typeof streams[streams.length - 1] !== "function") { throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]); } return streams.pop() as EndCallback; } // function isPromise(obj) { // return !!(obj && typeof obj.then === "function"); // } // deno-lint-ignore no-explicit-any function isReadable(obj: any): obj is Stream { return !!(obj && typeof obj.pipe === "function"); } // deno-lint-ignore no-explicit-any function isWritable(obj: any) { return !!(obj && typeof obj.write === "function"); } // deno-lint-ignore no-explicit-any function isStream(obj: any) { return isReadable(obj) || isWritable(obj); } // deno-lint-ignore no-explicit-any function isIterable(obj: any, isAsync?: boolean) { if (!obj) return false; if (isAsync === true) return typeof obj[Symbol.asyncIterator] === "function"; if (isAsync === false) return typeof obj[Symbol.iterator] === "function"; return typeof obj[Symbol.asyncIterator] === "function" || typeof obj[Symbol.iterator] === "function"; } // deno-lint-ignore no-explicit-any function makeAsyncIterable(val: Readable | Iterable | AsyncIterable) { if (isIterable(val)) { return val; } else if (isReadable(val)) { return fromReadable(val as Readable); } throw new ERR_INVALID_ARG_TYPE( "val", ["Readable", "Iterable", "AsyncIterable"], val, ); } async function* fromReadable(val: Readable) { yield* createReadableStreamAsyncIterator(val); } async function pump( // deno-lint-ignore no-explicit-any iterable: Iterable, writable: Duplex | Writable, finish: (err?: NodeErrorAbstraction | null) => void, ) { let error; try { for await (const chunk of iterable) { if (!writable.write(chunk)) { if (writable.destroyed) return; await events.once(writable, "drain"); } } writable.end(); } catch (err) { error = err; } finally { finish(error); } } export default function pipeline(...args: PipelineArguments) { const callback: EndCallback = once(popCallback(args)); let streams: [DataSource, ...Transformers[]]; if (args.length > 1) { streams = args as [DataSource, ...Transformers[]]; } else { throw new ERR_MISSING_ARGS("streams"); } let error: NodeErrorAbstraction; // deno-lint-ignore no-explicit-any let value: any; const destroys: Array<(err: NodeErrorAbstraction) => void> = []; let finishCount = 0; function finish(err?: NodeErrorAbstraction | null) { const final = --finishCount === 0; if (err && (!error || error.code === "ERR_STREAM_PREMATURE_CLOSE")) { error = err; } if (!error && !final) { return; } while (destroys.length) { (destroys.shift() as (err: NodeErrorAbstraction) => void)(error); } if (final) { callback(error, value); } } // TODO(Soremwar) // Simplify the hell out of this // deno-lint-ignore no-explicit-any let ret: any; for (let i = 0; i < streams.length; i++) { const stream = streams[i]; const reading = i < streams.length - 1; const writing = i > 0; if (isStream(stream)) { finishCount++; destroys.push(destroyer(stream as Streams, reading, writing, finish)); } if (i === 0) { if (typeof stream === "function") { ret = stream(); if (!isIterable(ret)) { throw new ERR_INVALID_RETURN_VALUE( "Iterable, AsyncIterable or Stream", "source", ret, ); } } else if (isIterable(stream) || isReadable(stream)) { ret = stream; } else { throw new ERR_INVALID_ARG_TYPE( "source", ["Stream", "Iterable", "AsyncIterable", "Function"], stream, ); } } else if (typeof stream === "function") { ret = makeAsyncIterable(ret); ret = stream(ret); if (reading) { if (!isIterable(ret, true)) { throw new ERR_INVALID_RETURN_VALUE( "AsyncIterable", `transform[${i - 1}]`, ret, ); } } else { // If the last argument to pipeline is not a stream // we must create a proxy stream so that pipeline(...) // always returns a stream which can be further // composed through `.pipe(stream)`. const pt = new PassThrough({ objectMode: true, }); if (ret instanceof Promise) { ret .then((val) => { value = val; pt.end(val); }, (err) => { pt.destroy(err); }); } else if (isIterable(ret, true)) { finishCount++; pump(ret, pt, finish); } else { throw new ERR_INVALID_RETURN_VALUE( "AsyncIterable or Promise", "destination", ret, ); } ret = pt; finishCount++; destroys.push(destroyer(ret, false, true, finish)); } } else if (isStream(stream)) { if (isReadable(ret)) { ret.pipe(stream as Readable); // TODO(Soremwar) // Reimplement after stdout and stderr are implemented // if (stream === process.stdout || stream === process.stderr) { // ret.on("end", () => stream.end()); // } } else { ret = makeAsyncIterable(ret); finishCount++; pump(ret, stream as Writable, finish); } ret = stream; } else { const name = reading ? `transform[${i - 1}]` : "destination"; throw new ERR_INVALID_ARG_TYPE( name, ["Stream", "Function"], ret, ); } } return ret as unknown as Readable; }