// Copyright Node.js contributors. All rights reserved. MIT License. import { Buffer } from "../buffer.ts"; import Duplex from "./duplex.ts"; import finished from "./end_of_stream.ts"; import { assert, assertEquals, assertStrictEquals, assertThrows, } from "../../testing/asserts.ts"; import { deferred, delay } from "../../async/mod.ts"; Deno.test("Duplex stream works normally", () => { const stream = new Duplex({ objectMode: true }); assert(stream._readableState.objectMode); assert(stream._writableState.objectMode); assert(stream.allowHalfOpen); assertEquals(stream.listenerCount("end"), 0); let written: { val: number }; let read: { val: number }; stream._write = (obj, _, cb) => { written = obj; cb(); }; stream._read = () => {}; stream.on("data", (obj) => { read = obj; }); stream.push({ val: 1 }); stream.end({ val: 2 }); stream.on("finish", () => { assertEquals(read.val, 1); assertEquals(written.val, 2); }); }); Deno.test("Duplex stream gets constructed correctly", () => { const d1 = new Duplex({ objectMode: true, highWaterMark: 100, }); assertEquals(d1.readableObjectMode, true); assertEquals(d1.readableHighWaterMark, 100); assertEquals(d1.writableObjectMode, true); assertEquals(d1.writableHighWaterMark, 100); const d2 = new Duplex({ readableObjectMode: false, readableHighWaterMark: 10, writableObjectMode: true, writableHighWaterMark: 100, }); assertEquals(d2.writableObjectMode, true); assertEquals(d2.writableHighWaterMark, 100); assertEquals(d2.readableObjectMode, false); assertEquals(d2.readableHighWaterMark, 10); }); Deno.test("Duplex stream can be paused", () => { const readable = new Duplex(); // _read is a noop, here. readable._read = () => {}; // Default state of a stream is not "paused" assert(!readable.isPaused()); // Make the stream start flowing... readable.on("data", () => {}); // still not paused. assert(!readable.isPaused()); readable.pause(); assert(readable.isPaused()); readable.resume(); assert(!readable.isPaused()); }); Deno.test("Duplex stream sets enconding correctly", () => { const readable = new Duplex({ read() {}, }); readable.setEncoding("utf8"); readable.push(new TextEncoder().encode("DEF")); readable.unshift(new TextEncoder().encode("ABC")); assertStrictEquals(readable.read(), "ABCDEF"); }); Deno.test("Duplex stream sets encoding correctly", () => { const readable = new Duplex({ read() {}, }); readable.setEncoding("utf8"); readable.push(new TextEncoder().encode("DEF")); readable.unshift(new TextEncoder().encode("ABC")); assertStrictEquals(readable.read(), "ABCDEF"); }); Deno.test("Duplex stream holds up a big push", async () => { let readExecuted = 0; const readExecutedExpected = 3; const readExpectedExecutions = deferred(); let endExecuted = 0; const endExecutedExpected = 1; const endExpectedExecutions = deferred(); const str = "asdfasdfasdfasdfasdf"; const r = new Duplex({ highWaterMark: 5, encoding: "utf8", }); let reads = 0; function _read() { if (reads === 0) { setTimeout(() => { r.push(str); }, 1); reads++; } else if (reads === 1) { const ret = r.push(str); assertEquals(ret, false); reads++; } else { r.push(null); } } r._read = () => { readExecuted++; if (readExecuted == readExecutedExpected) { readExpectedExecutions.resolve(); } _read(); }; r.on("end", () => { endExecuted++; if (endExecuted == endExecutedExpected) { endExpectedExecutions.resolve(); } }); // Push some data in to start. // We've never gotten any read event at this point. const ret = r.push(str); assert(!ret); let chunk = r.read(); assertEquals(chunk, str); chunk = r.read(); assertEquals(chunk, null); r.once("readable", () => { // This time, we'll get *all* the remaining data, because // it's been added synchronously, as the read WOULD take // us below the hwm, and so it triggered a _read() again, // which synchronously added more, which we then return. chunk = r.read(); assertEquals(chunk, str + str); chunk = r.read(); assertEquals(chunk, null); }); const readTimeout = setTimeout( () => readExpectedExecutions.reject(), 1000, ); const endTimeout = setTimeout( () => endExpectedExecutions.reject(), 1000, ); await readExpectedExecutions; await endExpectedExecutions; clearTimeout(readTimeout); clearTimeout(endTimeout); assertEquals(readExecuted, readExecutedExpected); assertEquals(endExecuted, endExecutedExpected); }); Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => { let readableExecuted = 0; const readableExecutedExpected = 1; const readableExpectedExecutions = deferred(); const r = new Duplex({ highWaterMark: 3, }); r._read = () => { throw new Error("_read must not be called"); }; r.push(Buffer.from("blerg")); setTimeout(function () { assert(!r._readableState.reading); r.on("readable", () => { readableExecuted++; if (readableExecuted == readableExecutedExpected) { readableExpectedExecutions.resolve(); } }); }, 1); const readableTimeout = setTimeout( () => readableExpectedExecutions.reject(), 1000, ); await readableExpectedExecutions; clearTimeout(readableTimeout); assertEquals(readableExecuted, readableExecutedExpected); }); Deno.test("Duplex stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => { let readableExecuted = 0; const readableExecutedExpected = 1; const readableExpectedExecutions = deferred(); let readExecuted = 0; const readExecutedExpected = 1; const readExpectedExecutions = deferred(); const r = new Duplex({ highWaterMark: 3, }); r._read = () => { readExecuted++; if (readExecuted == readExecutedExpected) { readExpectedExecutions.resolve(); } }; r.push(Buffer.from("bl")); setTimeout(function () { assert(r._readableState.reading); r.on("readable", () => { readableExecuted++; if (readableExecuted == readableExecutedExpected) { readableExpectedExecutions.resolve(); } }); }, 1); const readableTimeout = setTimeout( () => readableExpectedExecutions.reject(), 1000, ); const readTimeout = setTimeout( () => readExpectedExecutions.reject(), 1000, ); await readableExpectedExecutions; await readExpectedExecutions; clearTimeout(readableTimeout); clearTimeout(readTimeout); assertEquals(readableExecuted, readableExecutedExpected); assertEquals(readExecuted, readExecutedExpected); }); Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => { let readableExecuted = 0; const readableExecutedExpected = 1; const readableExpectedExecutions = deferred(); const r = new Duplex({ highWaterMark: 30, }); r._read = () => { throw new Error("Must not be executed"); }; r.push(Buffer.from("blerg")); //This ends the stream and triggers end r.push(null); setTimeout(function () { // Assert we're testing what we think we are assert(!r._readableState.reading); r.on("readable", () => { readableExecuted++; if (readableExecuted == readableExecutedExpected) { readableExpectedExecutions.resolve(); } }); }, 1); const readableTimeout = setTimeout( () => readableExpectedExecutions.reject(), 1000, ); await readableExpectedExecutions; clearTimeout(readableTimeout); assertEquals(readableExecuted, readableExecutedExpected); }); Deno.test("Duplex stream: 'read' is emitted on empty string pushed in non-object mode", async () => { let endExecuted = 0; const endExecutedExpected = 1; const endExpectedExecutions = deferred(); const underlyingData = ["", "x", "y", "", "z"]; const expected = underlyingData.filter((data) => data); const result: unknown[] = []; const r = new Duplex({ encoding: "utf8", }); r._read = function () { queueMicrotask(() => { if (!underlyingData.length) { this.push(null); } else { this.push(underlyingData.shift()); } }); }; r.on("readable", () => { const data = r.read(); if (data !== null) result.push(data); }); r.on("end", () => { endExecuted++; if (endExecuted == endExecutedExpected) { endExpectedExecutions.resolve(); } assertEquals(result, expected); }); const endTimeout = setTimeout( () => endExpectedExecutions.reject(), 1000, ); await endExpectedExecutions; clearTimeout(endTimeout); assertEquals(endExecuted, endExecutedExpected); }); Deno.test("Duplex stream: listeners can be removed", () => { const r = new Duplex(); r._read = () => {}; r.on("data", () => {}); r.removeAllListeners("data"); assertEquals(r.eventNames().length, 0); }); Deno.test("Duplex stream writes correctly", async () => { let callback: undefined | ((error?: Error | null | undefined) => void); let writeExecuted = 0; const writeExecutedExpected = 1; const writeExpectedExecutions = deferred(); let writevExecuted = 0; const writevExecutedExpected = 1; const writevExpectedExecutions = deferred(); const writable = new Duplex({ write: (chunk, encoding, cb) => { writeExecuted++; if (writeExecuted == writeExecutedExpected) { writeExpectedExecutions.resolve(); } assert(chunk instanceof Buffer); assertStrictEquals(encoding, "buffer"); assertStrictEquals(String(chunk), "ABC"); callback = cb; }, writev: (chunks) => { writevExecuted++; if (writevExecuted == writevExecutedExpected) { writevExpectedExecutions.resolve(); } assertStrictEquals(chunks.length, 2); assertStrictEquals(chunks[0].encoding, "buffer"); assertStrictEquals(chunks[1].encoding, "buffer"); assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI"); }, }); writable.write(new TextEncoder().encode("ABC")); writable.write(new TextEncoder().encode("DEF")); writable.end(new TextEncoder().encode("GHI")); callback?.(); const writeTimeout = setTimeout( () => writeExpectedExecutions.reject(), 1000, ); const writevTimeout = setTimeout( () => writevExpectedExecutions.reject(), 1000, ); await writeExpectedExecutions; await writevExpectedExecutions; clearTimeout(writeTimeout); clearTimeout(writevTimeout); assertEquals(writeExecuted, writeExecutedExpected); assertEquals(writevExecuted, writevExecutedExpected); }); Deno.test("Duplex stream writes Uint8Array in object mode", async () => { let writeExecuted = 0; const writeExecutedExpected = 1; const writeExpectedExecutions = deferred(); const ABC = new TextEncoder().encode("ABC"); const writable = new Duplex({ objectMode: true, write: (chunk, encoding, cb) => { writeExecuted++; if (writeExecuted == writeExecutedExpected) { writeExpectedExecutions.resolve(); } assert(!(chunk instanceof Buffer)); assert(chunk instanceof Uint8Array); assertEquals(chunk, ABC); assertEquals(encoding, "utf8"); cb(); }, }); writable.end(ABC); const writeTimeout = setTimeout( () => writeExpectedExecutions.reject(), 1000, ); await writeExpectedExecutions; clearTimeout(writeTimeout); assertEquals(writeExecuted, writeExecutedExpected); }); Deno.test("Duplex stream throws on unexpected close", async () => { let finishedExecuted = 0; const finishedExecutedExpected = 1; const finishedExpectedExecutions = deferred(); const writable = new Duplex({ write: () => {}, }); writable.writable = false; writable.destroy(); finished(writable, (err) => { finishedExecuted++; if (finishedExecuted == finishedExecutedExpected) { finishedExpectedExecutions.resolve(); } assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE"); }); const finishedTimeout = setTimeout( () => finishedExpectedExecutions.reject(), 1000, ); await finishedExpectedExecutions; clearTimeout(finishedTimeout); assertEquals(finishedExecuted, finishedExecutedExpected); }); Deno.test("Duplex stream finishes correctly after error", async () => { let errorExecuted = 0; const errorExecutedExpected = 1; const errorExpectedExecutions = deferred(); let finishedExecuted = 0; const finishedExecutedExpected = 1; const finishedExpectedExecutions = deferred(); const w = new Duplex({ write(_chunk, _encoding, cb) { cb(new Error()); }, autoDestroy: false, }); w.write("asd"); w.on("error", () => { errorExecuted++; if (errorExecuted == errorExecutedExpected) { errorExpectedExecutions.resolve(); } finished(w, () => { finishedExecuted++; if (finishedExecuted == finishedExecutedExpected) { finishedExpectedExecutions.resolve(); } }); }); const errorTimeout = setTimeout( () => errorExpectedExecutions.reject(), 1000, ); const finishedTimeout = setTimeout( () => finishedExpectedExecutions.reject(), 1000, ); await finishedExpectedExecutions; await errorExpectedExecutions; clearTimeout(finishedTimeout); clearTimeout(errorTimeout); assertEquals(finishedExecuted, finishedExecutedExpected); assertEquals(errorExecuted, errorExecutedExpected); }); Deno.test("Duplex stream fails on 'write' null value", () => { const writable = new Duplex(); assertThrows(() => writable.write(null)); }); Deno.test("Duplex stream is destroyed correctly", async () => { let closeExecuted = 0; const closeExecutedExpected = 1; const closeExpectedExecutions = deferred(); const unexpectedExecution = deferred(); const duplex = new Duplex({ write(_chunk, _enc, cb) { cb(); }, read() {}, }); duplex.resume(); function never() { unexpectedExecution.reject(); } duplex.on("end", never); duplex.on("finish", never); duplex.on("close", () => { closeExecuted++; if (closeExecuted == closeExecutedExpected) { closeExpectedExecutions.resolve(); } }); duplex.destroy(); assertEquals(duplex.destroyed, true); const closeTimeout = setTimeout( () => closeExpectedExecutions.reject(), 1000, ); await Promise.race([ unexpectedExecution, delay(100), ]); await closeExpectedExecutions; clearTimeout(closeTimeout); assertEquals(closeExecuted, closeExecutedExpected); }); Deno.test("Duplex stream errors correctly on destroy", async () => { let errorExecuted = 0; const errorExecutedExpected = 1; const errorExpectedExecutions = deferred(); const unexpectedExecution = deferred(); const duplex = new Duplex({ write(_chunk, _enc, cb) { cb(); }, read() {}, }); duplex.resume(); const expected = new Error("kaboom"); function never() { unexpectedExecution.reject(); } duplex.on("end", never); duplex.on("finish", never); duplex.on("error", (err) => { errorExecuted++; if (errorExecuted == errorExecutedExpected) { errorExpectedExecutions.resolve(); } assertStrictEquals(err, expected); }); duplex.destroy(expected); assertEquals(duplex.destroyed, true); const errorTimeout = setTimeout( () => errorExpectedExecutions.reject(), 1000, ); await Promise.race([ unexpectedExecution, delay(100), ]); await errorExpectedExecutions; clearTimeout(errorTimeout); assertEquals(errorExecuted, errorExecutedExpected); }); Deno.test("Duplex stream doesn't finish on allowHalfOpen", async () => { const unexpectedExecution = deferred(); const duplex = new Duplex({ read() {}, }); assertEquals(duplex.allowHalfOpen, true); duplex.on("finish", () => unexpectedExecution.reject()); assertEquals(duplex.listenerCount("end"), 0); duplex.resume(); duplex.push(null); await Promise.race([ unexpectedExecution, delay(100), ]); }); Deno.test("Duplex stream finishes when allowHalfOpen is disabled", async () => { let finishExecuted = 0; const finishExecutedExpected = 1; const finishExpectedExecutions = deferred(); const duplex = new Duplex({ read() {}, allowHalfOpen: false, }); assertEquals(duplex.allowHalfOpen, false); duplex.on("finish", () => { finishExecuted++; if (finishExecuted == finishExecutedExpected) { finishExpectedExecutions.resolve(); } }); assertEquals(duplex.listenerCount("end"), 0); duplex.resume(); duplex.push(null); const finishTimeout = setTimeout( () => finishExpectedExecutions.reject(), 1000, ); await finishExpectedExecutions; clearTimeout(finishTimeout); assertEquals(finishExecuted, finishExecutedExpected); }); Deno.test("Duplex stream doesn't finish when allowHalfOpen is disabled but stream ended", async () => { const unexpectedExecution = deferred(); const duplex = new Duplex({ read() {}, allowHalfOpen: false, }); assertEquals(duplex.allowHalfOpen, false); duplex._writableState.ended = true; duplex.on("finish", () => unexpectedExecution.reject()); assertEquals(duplex.listenerCount("end"), 0); duplex.resume(); duplex.push(null); await Promise.race([ unexpectedExecution, delay(100), ]); });