1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-18 20:04:03 -05:00
denoland-deno/std/node/_stream/duplex_test.ts

698 lines
17 KiB
TypeScript

// Copyright Node.js contributors. All rights reserved. MIT License.
import { Buffer } from "../buffer.ts";
import Duplex from "./duplex.ts";
import finished from "./end_of_stream.ts";
import {
assert,
assertEquals,
assertStrictEquals,
assertThrows,
} from "../../testing/asserts.ts";
import { deferred, delay } from "../../async/mod.ts";
Deno.test("Duplex stream works normally", () => {
const stream = new Duplex({ objectMode: true });
assert(stream._readableState.objectMode);
assert(stream._writableState.objectMode);
assert(stream.allowHalfOpen);
assertEquals(stream.listenerCount("end"), 0);
let written: { val: number };
let read: { val: number };
stream._write = (obj, _, cb) => {
written = obj;
cb();
};
stream._read = () => {};
stream.on("data", (obj) => {
read = obj;
});
stream.push({ val: 1 });
stream.end({ val: 2 });
stream.on("finish", () => {
assertEquals(read.val, 1);
assertEquals(written.val, 2);
});
});
Deno.test("Duplex stream gets constructed correctly", () => {
const d1 = new Duplex({
objectMode: true,
highWaterMark: 100,
});
assertEquals(d1.readableObjectMode, true);
assertEquals(d1.readableHighWaterMark, 100);
assertEquals(d1.writableObjectMode, true);
assertEquals(d1.writableHighWaterMark, 100);
const d2 = new Duplex({
readableObjectMode: false,
readableHighWaterMark: 10,
writableObjectMode: true,
writableHighWaterMark: 100,
});
assertEquals(d2.writableObjectMode, true);
assertEquals(d2.writableHighWaterMark, 100);
assertEquals(d2.readableObjectMode, false);
assertEquals(d2.readableHighWaterMark, 10);
});
Deno.test("Duplex stream can be paused", () => {
const readable = new Duplex();
// _read is a noop, here.
readable._read = () => {};
// Default state of a stream is not "paused"
assert(!readable.isPaused());
// Make the stream start flowing...
readable.on("data", () => {});
// still not paused.
assert(!readable.isPaused());
readable.pause();
assert(readable.isPaused());
readable.resume();
assert(!readable.isPaused());
});
Deno.test("Duplex stream sets enconding correctly", () => {
const readable = new Duplex({
read() {},
});
readable.setEncoding("utf8");
readable.push(new TextEncoder().encode("DEF"));
readable.unshift(new TextEncoder().encode("ABC"));
assertStrictEquals(readable.read(), "ABCDEF");
});
Deno.test("Duplex stream sets encoding correctly", () => {
const readable = new Duplex({
read() {},
});
readable.setEncoding("utf8");
readable.push(new TextEncoder().encode("DEF"));
readable.unshift(new TextEncoder().encode("ABC"));
assertStrictEquals(readable.read(), "ABCDEF");
});
Deno.test("Duplex stream holds up a big push", async () => {
let readExecuted = 0;
const readExecutedExpected = 3;
const readExpectedExecutions = deferred();
let endExecuted = 0;
const endExecutedExpected = 1;
const endExpectedExecutions = deferred();
const str = "asdfasdfasdfasdfasdf";
const r = new Duplex({
highWaterMark: 5,
encoding: "utf8",
});
let reads = 0;
function _read() {
if (reads === 0) {
setTimeout(() => {
r.push(str);
}, 1);
reads++;
} else if (reads === 1) {
const ret = r.push(str);
assertEquals(ret, false);
reads++;
} else {
r.push(null);
}
}
r._read = () => {
readExecuted++;
if (readExecuted == readExecutedExpected) {
readExpectedExecutions.resolve();
}
_read();
};
r.on("end", () => {
endExecuted++;
if (endExecuted == endExecutedExpected) {
endExpectedExecutions.resolve();
}
});
// Push some data in to start.
// We've never gotten any read event at this point.
const ret = r.push(str);
assert(!ret);
let chunk = r.read();
assertEquals(chunk, str);
chunk = r.read();
assertEquals(chunk, null);
r.once("readable", () => {
// This time, we'll get *all* the remaining data, because
// it's been added synchronously, as the read WOULD take
// us below the hwm, and so it triggered a _read() again,
// which synchronously added more, which we then return.
chunk = r.read();
assertEquals(chunk, str + str);
chunk = r.read();
assertEquals(chunk, null);
});
const readTimeout = setTimeout(
() => readExpectedExecutions.reject(),
1000,
);
const endTimeout = setTimeout(
() => endExpectedExecutions.reject(),
1000,
);
await readExpectedExecutions;
await endExpectedExecutions;
clearTimeout(readTimeout);
clearTimeout(endTimeout);
assertEquals(readExecuted, readExecutedExpected);
assertEquals(endExecuted, endExecutedExpected);
});
Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => {
let readableExecuted = 0;
const readableExecutedExpected = 1;
const readableExpectedExecutions = deferred();
const r = new Duplex({
highWaterMark: 3,
});
r._read = () => {
throw new Error("_read must not be called");
};
r.push(Buffer.from("blerg"));
setTimeout(function () {
assert(!r._readableState.reading);
r.on("readable", () => {
readableExecuted++;
if (readableExecuted == readableExecutedExpected) {
readableExpectedExecutions.resolve();
}
});
}, 1);
const readableTimeout = setTimeout(
() => readableExpectedExecutions.reject(),
1000,
);
await readableExpectedExecutions;
clearTimeout(readableTimeout);
assertEquals(readableExecuted, readableExecutedExpected);
});
Deno.test("Duplex stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => {
let readableExecuted = 0;
const readableExecutedExpected = 1;
const readableExpectedExecutions = deferred();
let readExecuted = 0;
const readExecutedExpected = 1;
const readExpectedExecutions = deferred();
const r = new Duplex({
highWaterMark: 3,
});
r._read = () => {
readExecuted++;
if (readExecuted == readExecutedExpected) {
readExpectedExecutions.resolve();
}
};
r.push(Buffer.from("bl"));
setTimeout(function () {
assert(r._readableState.reading);
r.on("readable", () => {
readableExecuted++;
if (readableExecuted == readableExecutedExpected) {
readableExpectedExecutions.resolve();
}
});
}, 1);
const readableTimeout = setTimeout(
() => readableExpectedExecutions.reject(),
1000,
);
const readTimeout = setTimeout(
() => readExpectedExecutions.reject(),
1000,
);
await readableExpectedExecutions;
await readExpectedExecutions;
clearTimeout(readableTimeout);
clearTimeout(readTimeout);
assertEquals(readableExecuted, readableExecutedExpected);
assertEquals(readExecuted, readExecutedExpected);
});
Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => {
let readableExecuted = 0;
const readableExecutedExpected = 1;
const readableExpectedExecutions = deferred();
const r = new Duplex({
highWaterMark: 30,
});
r._read = () => {
throw new Error("Must not be executed");
};
r.push(Buffer.from("blerg"));
//This ends the stream and triggers end
r.push(null);
setTimeout(function () {
// Assert we're testing what we think we are
assert(!r._readableState.reading);
r.on("readable", () => {
readableExecuted++;
if (readableExecuted == readableExecutedExpected) {
readableExpectedExecutions.resolve();
}
});
}, 1);
const readableTimeout = setTimeout(
() => readableExpectedExecutions.reject(),
1000,
);
await readableExpectedExecutions;
clearTimeout(readableTimeout);
assertEquals(readableExecuted, readableExecutedExpected);
});
Deno.test("Duplex stream: 'read' is emitted on empty string pushed in non-object mode", async () => {
let endExecuted = 0;
const endExecutedExpected = 1;
const endExpectedExecutions = deferred();
const underlyingData = ["", "x", "y", "", "z"];
const expected = underlyingData.filter((data) => data);
const result: unknown[] = [];
const r = new Duplex({
encoding: "utf8",
});
r._read = function () {
queueMicrotask(() => {
if (!underlyingData.length) {
this.push(null);
} else {
this.push(underlyingData.shift());
}
});
};
r.on("readable", () => {
const data = r.read();
if (data !== null) result.push(data);
});
r.on("end", () => {
endExecuted++;
if (endExecuted == endExecutedExpected) {
endExpectedExecutions.resolve();
}
assertEquals(result, expected);
});
const endTimeout = setTimeout(
() => endExpectedExecutions.reject(),
1000,
);
await endExpectedExecutions;
clearTimeout(endTimeout);
assertEquals(endExecuted, endExecutedExpected);
});
Deno.test("Duplex stream: listeners can be removed", () => {
const r = new Duplex();
r._read = () => {};
r.on("data", () => {});
r.removeAllListeners("data");
assertEquals(r.eventNames().length, 0);
});
Deno.test("Duplex stream writes correctly", async () => {
let callback: undefined | ((error?: Error | null | undefined) => void);
let writeExecuted = 0;
const writeExecutedExpected = 1;
const writeExpectedExecutions = deferred();
let writevExecuted = 0;
const writevExecutedExpected = 1;
const writevExpectedExecutions = deferred();
const writable = new Duplex({
write: (chunk, encoding, cb) => {
writeExecuted++;
if (writeExecuted == writeExecutedExpected) {
writeExpectedExecutions.resolve();
}
assert(chunk instanceof Buffer);
assertStrictEquals(encoding, "buffer");
assertStrictEquals(String(chunk), "ABC");
callback = cb;
},
writev: (chunks) => {
writevExecuted++;
if (writevExecuted == writevExecutedExpected) {
writevExpectedExecutions.resolve();
}
assertStrictEquals(chunks.length, 2);
assertStrictEquals(chunks[0].encoding, "buffer");
assertStrictEquals(chunks[1].encoding, "buffer");
assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI");
},
});
writable.write(new TextEncoder().encode("ABC"));
writable.write(new TextEncoder().encode("DEF"));
writable.end(new TextEncoder().encode("GHI"));
callback?.();
const writeTimeout = setTimeout(
() => writeExpectedExecutions.reject(),
1000,
);
const writevTimeout = setTimeout(
() => writevExpectedExecutions.reject(),
1000,
);
await writeExpectedExecutions;
await writevExpectedExecutions;
clearTimeout(writeTimeout);
clearTimeout(writevTimeout);
assertEquals(writeExecuted, writeExecutedExpected);
assertEquals(writevExecuted, writevExecutedExpected);
});
Deno.test("Duplex stream writes Uint8Array in object mode", async () => {
let writeExecuted = 0;
const writeExecutedExpected = 1;
const writeExpectedExecutions = deferred();
const ABC = new TextEncoder().encode("ABC");
const writable = new Duplex({
objectMode: true,
write: (chunk, encoding, cb) => {
writeExecuted++;
if (writeExecuted == writeExecutedExpected) {
writeExpectedExecutions.resolve();
}
assert(!(chunk instanceof Buffer));
assert(chunk instanceof Uint8Array);
assertEquals(chunk, ABC);
assertEquals(encoding, "utf8");
cb();
},
});
writable.end(ABC);
const writeTimeout = setTimeout(
() => writeExpectedExecutions.reject(),
1000,
);
await writeExpectedExecutions;
clearTimeout(writeTimeout);
assertEquals(writeExecuted, writeExecutedExpected);
});
Deno.test("Duplex stream throws on unexpected close", async () => {
let finishedExecuted = 0;
const finishedExecutedExpected = 1;
const finishedExpectedExecutions = deferred();
const writable = new Duplex({
write: () => {},
});
writable.writable = false;
writable.destroy();
finished(writable, (err) => {
finishedExecuted++;
if (finishedExecuted == finishedExecutedExpected) {
finishedExpectedExecutions.resolve();
}
assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE");
});
const finishedTimeout = setTimeout(
() => finishedExpectedExecutions.reject(),
1000,
);
await finishedExpectedExecutions;
clearTimeout(finishedTimeout);
assertEquals(finishedExecuted, finishedExecutedExpected);
});
Deno.test("Duplex stream finishes correctly after error", async () => {
let errorExecuted = 0;
const errorExecutedExpected = 1;
const errorExpectedExecutions = deferred();
let finishedExecuted = 0;
const finishedExecutedExpected = 1;
const finishedExpectedExecutions = deferred();
const w = new Duplex({
write(_chunk, _encoding, cb) {
cb(new Error());
},
autoDestroy: false,
});
w.write("asd");
w.on("error", () => {
errorExecuted++;
if (errorExecuted == errorExecutedExpected) {
errorExpectedExecutions.resolve();
}
finished(w, () => {
finishedExecuted++;
if (finishedExecuted == finishedExecutedExpected) {
finishedExpectedExecutions.resolve();
}
});
});
const errorTimeout = setTimeout(
() => errorExpectedExecutions.reject(),
1000,
);
const finishedTimeout = setTimeout(
() => finishedExpectedExecutions.reject(),
1000,
);
await finishedExpectedExecutions;
await errorExpectedExecutions;
clearTimeout(finishedTimeout);
clearTimeout(errorTimeout);
assertEquals(finishedExecuted, finishedExecutedExpected);
assertEquals(errorExecuted, errorExecutedExpected);
});
Deno.test("Duplex stream fails on 'write' null value", () => {
const writable = new Duplex();
assertThrows(() => writable.write(null));
});
Deno.test("Duplex stream is destroyed correctly", async () => {
let closeExecuted = 0;
const closeExecutedExpected = 1;
const closeExpectedExecutions = deferred();
const unexpectedExecution = deferred();
const duplex = new Duplex({
write(_chunk, _enc, cb) {
cb();
},
read() {},
});
duplex.resume();
function never() {
unexpectedExecution.reject();
}
duplex.on("end", never);
duplex.on("finish", never);
duplex.on("close", () => {
closeExecuted++;
if (closeExecuted == closeExecutedExpected) {
closeExpectedExecutions.resolve();
}
});
duplex.destroy();
assertEquals(duplex.destroyed, true);
const closeTimeout = setTimeout(
() => closeExpectedExecutions.reject(),
1000,
);
await Promise.race([
unexpectedExecution,
delay(100),
]);
await closeExpectedExecutions;
clearTimeout(closeTimeout);
assertEquals(closeExecuted, closeExecutedExpected);
});
Deno.test("Duplex stream errors correctly on destroy", async () => {
let errorExecuted = 0;
const errorExecutedExpected = 1;
const errorExpectedExecutions = deferred();
const unexpectedExecution = deferred();
const duplex = new Duplex({
write(_chunk, _enc, cb) {
cb();
},
read() {},
});
duplex.resume();
const expected = new Error("kaboom");
function never() {
unexpectedExecution.reject();
}
duplex.on("end", never);
duplex.on("finish", never);
duplex.on("error", (err) => {
errorExecuted++;
if (errorExecuted == errorExecutedExpected) {
errorExpectedExecutions.resolve();
}
assertStrictEquals(err, expected);
});
duplex.destroy(expected);
assertEquals(duplex.destroyed, true);
const errorTimeout = setTimeout(
() => errorExpectedExecutions.reject(),
1000,
);
await Promise.race([
unexpectedExecution,
delay(100),
]);
await errorExpectedExecutions;
clearTimeout(errorTimeout);
assertEquals(errorExecuted, errorExecutedExpected);
});
Deno.test("Duplex stream doesn't finish on allowHalfOpen", async () => {
const unexpectedExecution = deferred();
const duplex = new Duplex({
read() {},
});
assertEquals(duplex.allowHalfOpen, true);
duplex.on("finish", () => unexpectedExecution.reject());
assertEquals(duplex.listenerCount("end"), 0);
duplex.resume();
duplex.push(null);
await Promise.race([
unexpectedExecution,
delay(100),
]);
});
Deno.test("Duplex stream finishes when allowHalfOpen is disabled", async () => {
let finishExecuted = 0;
const finishExecutedExpected = 1;
const finishExpectedExecutions = deferred();
const duplex = new Duplex({
read() {},
allowHalfOpen: false,
});
assertEquals(duplex.allowHalfOpen, false);
duplex.on("finish", () => {
finishExecuted++;
if (finishExecuted == finishExecutedExpected) {
finishExpectedExecutions.resolve();
}
});
assertEquals(duplex.listenerCount("end"), 0);
duplex.resume();
duplex.push(null);
const finishTimeout = setTimeout(
() => finishExpectedExecutions.reject(),
1000,
);
await finishExpectedExecutions;
clearTimeout(finishTimeout);
assertEquals(finishExecuted, finishExecutedExpected);
});
Deno.test("Duplex stream doesn't finish when allowHalfOpen is disabled but stream ended", async () => {
const unexpectedExecution = deferred();
const duplex = new Duplex({
read() {},
allowHalfOpen: false,
});
assertEquals(duplex.allowHalfOpen, false);
duplex._writableState.ended = true;
duplex.on("finish", () => unexpectedExecution.reject());
assertEquals(duplex.listenerCount("end"), 0);
duplex.resume();
duplex.push(null);
await Promise.race([
unexpectedExecution,
delay(100),
]);
});