1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-24 15:19:26 -05:00

fix(ext/node): Stream should be instance of EventEmitter (#25527)

Closes https://github.com/denoland/deno/issues/25526
This commit is contained in:
Satya Rohith 2024-09-09 18:36:56 +05:30 committed by GitHub
parent ea8bf0945a
commit 5126ccb842
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 8 additions and 438 deletions

View file

@ -3,6 +3,7 @@
// deno-fmt-ignore-file // deno-fmt-ignore-file
// deno-lint-ignore-file // deno-lint-ignore-file
import { nextTick } from "ext:deno_node/_next_tick.ts"; import { nextTick } from "ext:deno_node/_next_tick.ts";
import { EventEmitter as EE } from "ext:deno_node/_events.mjs";
import { AbortController } from "ext:deno_web/03_abort_signal.js"; import { AbortController } from "ext:deno_web/03_abort_signal.js";
import { Blob } from "ext:deno_web/09_file.js"; import { Blob } from "ext:deno_web/09_file.js";
import { StringDecoder } from "node:string_decoder"; import { StringDecoder } from "node:string_decoder";
@ -1723,446 +1724,11 @@ var require_destroy = __commonJS({
}, },
}); });
// node_modules/events/events.js
var require_events = __commonJS({
"node_modules/events/events.js"(exports, module) {
"use strict";
var R = typeof Reflect === "object" ? Reflect : null;
var ReflectApply = R && typeof R.apply === "function"
? R.apply
: function ReflectApply2(target, receiver, args) {
return Function.prototype.apply.call(target, receiver, args);
};
var ReflectOwnKeys;
if (R && typeof R.ownKeys === "function") {
ReflectOwnKeys = R.ownKeys;
} else if (Object.getOwnPropertySymbols) {
ReflectOwnKeys = function ReflectOwnKeys2(target) {
return Object.getOwnPropertyNames(target).concat(
Object.getOwnPropertySymbols(target),
);
};
} else {
ReflectOwnKeys = function ReflectOwnKeys2(target) {
return Object.getOwnPropertyNames(target);
};
}
function ProcessEmitWarning(warning) {
if (console && console.warn) {
console.warn(warning);
}
}
var NumberIsNaN = Number.isNaN || function NumberIsNaN2(value) {
return value !== value;
};
function EventEmitter() {
EventEmitter.init.call(this);
}
module.exports = EventEmitter;
module.exports.once = once;
EventEmitter.EventEmitter = EventEmitter;
EventEmitter.prototype._events = void 0;
EventEmitter.prototype._eventsCount = 0;
EventEmitter.prototype._maxListeners = void 0;
var defaultMaxListeners = 10;
function checkListener(listener) {
if (typeof listener !== "function") {
throw new TypeError(
'The "listener" argument must be of type Function. Received type ' +
typeof listener,
);
}
}
Object.defineProperty(EventEmitter, "defaultMaxListeners", {
enumerable: true,
get: function () {
return defaultMaxListeners;
},
set: function (arg) {
if (typeof arg !== "number" || arg < 0 || NumberIsNaN(arg)) {
throw new RangeError(
'The value of "defaultMaxListeners" is out of range. It must be a non-negative number. Received ' +
arg + ".",
);
}
defaultMaxListeners = arg;
},
});
EventEmitter.init = function () {
if (
this._events === void 0 ||
this._events === Object.getPrototypeOf(this)._events
) {
this._events = /* @__PURE__ */ Object.create(null);
this._eventsCount = 0;
}
this._maxListeners = this._maxListeners || void 0;
};
EventEmitter.prototype.setMaxListeners = function setMaxListeners(n) {
if (typeof n !== "number" || n < 0 || NumberIsNaN(n)) {
throw new RangeError(
'The value of "n" is out of range. It must be a non-negative number. Received ' +
n + ".",
);
}
this._maxListeners = n;
return this;
};
function _getMaxListeners(that) {
if (that._maxListeners === void 0) {
return EventEmitter.defaultMaxListeners;
}
return that._maxListeners;
}
EventEmitter.prototype.getMaxListeners = function getMaxListeners() {
return _getMaxListeners(this);
};
EventEmitter.prototype.emit = function emit(type) {
var args = [];
for (var i = 1; i < arguments.length; i++) {
args.push(arguments[i]);
}
var doError = type === "error";
var events = this._events;
if (events !== void 0) {
doError = doError && events.error === void 0;
} else if (!doError) {
return false;
}
if (doError) {
var er;
if (args.length > 0) {
er = args[0];
}
if (er instanceof Error) {
throw er;
}
var err = new Error(
"Unhandled error." + (er ? " (" + er.message + ")" : ""),
);
err.context = er;
throw err;
}
var handler = events[type];
if (handler === void 0) {
return false;
}
if (typeof handler === "function") {
ReflectApply(handler, this, args);
} else {
var len = handler.length;
var listeners = arrayClone(handler, len);
for (var i = 0; i < len; ++i) {
ReflectApply(listeners[i], this, args);
}
}
return true;
};
function _addListener(target, type, listener, prepend) {
var m;
var events;
var existing;
checkListener(listener);
events = target._events;
if (events === void 0) {
events = target._events = /* @__PURE__ */ Object.create(null);
target._eventsCount = 0;
} else {
if (events.newListener !== void 0) {
target.emit(
"newListener",
type,
listener.listener ? listener.listener : listener,
);
events = target._events;
}
existing = events[type];
}
if (existing === void 0) {
existing = events[type] = listener;
++target._eventsCount;
} else {
if (typeof existing === "function") {
existing = events[type] = prepend
? [listener, existing]
: [existing, listener];
} else if (prepend) {
existing.unshift(listener);
} else {
existing.push(listener);
}
m = _getMaxListeners(target);
if (m > 0 && existing.length > m && !existing.warned) {
existing.warned = true;
var w = new Error(
"Possible EventEmitter memory leak detected. " + existing.length +
" " + String(type) +
" listeners added. Use emitter.setMaxListeners() to increase limit",
);
w.name = "MaxListenersExceededWarning";
w.emitter = target;
w.type = type;
w.count = existing.length;
ProcessEmitWarning(w);
}
}
return target;
}
EventEmitter.prototype.addListener = function addListener(type, listener) {
return _addListener(this, type, listener, false);
};
EventEmitter.prototype.on = EventEmitter.prototype.addListener;
EventEmitter.prototype.prependListener = function prependListener(
type,
listener,
) {
return _addListener(this, type, listener, true);
};
function onceWrapper() {
if (!this.fired) {
this.target.removeListener(this.type, this.wrapFn);
this.fired = true;
if (arguments.length === 0) {
return this.listener.call(this.target);
}
return this.listener.apply(this.target, arguments);
}
}
function _onceWrap(target, type, listener) {
var state = { fired: false, wrapFn: void 0, target, type, listener };
var wrapped = onceWrapper.bind(state);
wrapped.listener = listener;
state.wrapFn = wrapped;
return wrapped;
}
EventEmitter.prototype.once = function once2(type, listener) {
checkListener(listener);
this.on(type, _onceWrap(this, type, listener));
return this;
};
EventEmitter.prototype.prependOnceListener = function prependOnceListener(
type,
listener,
) {
checkListener(listener);
this.prependListener(type, _onceWrap(this, type, listener));
return this;
};
EventEmitter.prototype.removeListener = function removeListener(
type,
listener,
) {
var list, events, position, i, originalListener;
checkListener(listener);
events = this._events;
if (events === void 0) {
return this;
}
list = events[type];
if (list === void 0) {
return this;
}
if (list === listener || list.listener === listener) {
if (--this._eventsCount === 0) {
this._events = /* @__PURE__ */ Object.create(null);
} else {
delete events[type];
if (events.removeListener) {
this.emit("removeListener", type, list.listener || listener);
}
}
} else if (typeof list !== "function") {
position = -1;
for (i = list.length - 1; i >= 0; i--) {
if (list[i] === listener || list[i].listener === listener) {
originalListener = list[i].listener;
position = i;
break;
}
}
if (position < 0) {
return this;
}
if (position === 0) {
list.shift();
} else {
spliceOne(list, position);
}
if (list.length === 1) {
events[type] = list[0];
}
if (events.removeListener !== void 0) {
this.emit("removeListener", type, originalListener || listener);
}
}
return this;
};
EventEmitter.prototype.off = EventEmitter.prototype.removeListener;
EventEmitter.prototype.removeAllListeners = function removeAllListeners(
type,
) {
var listeners, events, i;
events = this._events;
if (events === void 0) {
return this;
}
if (events.removeListener === void 0) {
if (arguments.length === 0) {
this._events = /* @__PURE__ */ Object.create(null);
this._eventsCount = 0;
} else if (events[type] !== void 0) {
if (--this._eventsCount === 0) {
this._events = /* @__PURE__ */ Object.create(null);
} else {
delete events[type];
}
}
return this;
}
if (arguments.length === 0) {
var keys = Object.keys(events);
var key;
for (i = 0; i < keys.length; ++i) {
key = keys[i];
if (key === "removeListener") {
continue;
}
this.removeAllListeners(key);
}
this.removeAllListeners("removeListener");
this._events = /* @__PURE__ */ Object.create(null);
this._eventsCount = 0;
return this;
}
listeners = events[type];
if (typeof listeners === "function") {
this.removeListener(type, listeners);
} else if (listeners !== void 0) {
for (i = listeners.length - 1; i >= 0; i--) {
this.removeListener(type, listeners[i]);
}
}
return this;
};
function _listeners(target, type, unwrap) {
var events = target._events;
if (events === void 0) {
return [];
}
var evlistener = events[type];
if (evlistener === void 0) {
return [];
}
if (typeof evlistener === "function") {
return unwrap ? [evlistener.listener || evlistener] : [evlistener];
}
return unwrap
? unwrapListeners(evlistener)
: arrayClone(evlistener, evlistener.length);
}
EventEmitter.prototype.listeners = function listeners(type) {
return _listeners(this, type, true);
};
EventEmitter.prototype.rawListeners = function rawListeners(type) {
return _listeners(this, type, false);
};
EventEmitter.listenerCount = function (emitter, type) {
if (typeof emitter.listenerCount === "function") {
return emitter.listenerCount(type);
} else {
return listenerCount.call(emitter, type);
}
};
EventEmitter.prototype.listenerCount = listenerCount;
function listenerCount(type) {
var events = this._events;
if (events !== void 0) {
var evlistener = events[type];
if (typeof evlistener === "function") {
return 1;
} else if (evlistener !== void 0) {
return evlistener.length;
}
}
return 0;
}
EventEmitter.prototype.eventNames = function eventNames() {
return this._eventsCount > 0 ? ReflectOwnKeys(this._events) : [];
};
function arrayClone(arr, n) {
var copy = new Array(n);
for (var i = 0; i < n; ++i) {
copy[i] = arr[i];
}
return copy;
}
function spliceOne(list, index) {
for (; index + 1 < list.length; index++) {
list[index] = list[index + 1];
}
list.pop();
}
function unwrapListeners(arr) {
var ret = new Array(arr.length);
for (var i = 0; i < ret.length; ++i) {
ret[i] = arr[i].listener || arr[i];
}
return ret;
}
function once(emitter, name) {
return new Promise(function (resolve, reject) {
function errorListener(err) {
emitter.removeListener(name, resolver);
reject(err);
}
function resolver() {
if (typeof emitter.removeListener === "function") {
emitter.removeListener("error", errorListener);
}
resolve([].slice.call(arguments));
}
eventTargetAgnosticAddListener(emitter, name, resolver, { once: true });
if (name !== "error") {
addErrorHandlerIfEventEmitter(emitter, errorListener, { once: true });
}
});
}
function addErrorHandlerIfEventEmitter(emitter, handler, flags) {
if (typeof emitter.on === "function") {
eventTargetAgnosticAddListener(emitter, "error", handler, flags);
}
}
function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
if (typeof emitter.on === "function") {
if (flags.once) {
emitter.once(name, listener);
} else {
emitter.on(name, listener);
}
} else if (typeof emitter.addEventListener === "function") {
emitter.addEventListener(name, function wrapListener(arg) {
if (flags.once) {
emitter.removeEventListener(name, wrapListener);
}
listener(arg);
});
} else {
throw new TypeError(
'The "emitter" argument must be of type EventEmitter. Received type ' +
typeof emitter,
);
}
}
},
});
// lib/internal/streams/legacy.js // lib/internal/streams/legacy.js
var require_legacy = __commonJS({ var require_legacy = __commonJS({
"lib/internal/streams/legacy.js"(exports, module) { "lib/internal/streams/legacy.js"(exports, module) {
"use strict"; "use strict";
var { ArrayIsArray, ObjectSetPrototypeOf } = require_primordials(); var { ArrayIsArray, ObjectSetPrototypeOf } = require_primordials();
var { EventEmitter: EE } = require_events();
function Stream(opts) { function Stream(opts) {
EE.call(this, opts); EE.call(this, opts);
} }
@ -2688,7 +2254,6 @@ var require_readable = __commonJS({
} = require_primordials(); } = require_primordials();
module.exports = Readable; module.exports = Readable;
Readable.ReadableState = ReadableState; Readable.ReadableState = ReadableState;
var { EventEmitter: EE } = require_events();
var { Stream, prependListener } = require_legacy(); var { Stream, prependListener } = require_legacy();
var { Buffer: Buffer2 } = require_buffer(); var { Buffer: Buffer2 } = require_buffer();
var { addAbortSignal } = require_add_abort_signal(); var { addAbortSignal } = require_add_abort_signal();
@ -3722,7 +3287,6 @@ var require_writable = __commonJS({
} = require_primordials(); } = require_primordials();
module.exports = Writable; module.exports = Writable;
Writable.WritableState = WritableState; Writable.WritableState = WritableState;
var { EventEmitter: EE } = require_events();
var Stream = require_legacy().Stream; var Stream = require_legacy().Stream;
var { Buffer: Buffer2 } = require_buffer(); var { Buffer: Buffer2 } = require_buffer();
var destroyImpl = require_destroy(); var destroyImpl = require_destroy();

View file

@ -4,8 +4,9 @@ import { assert, assertEquals } from "@std/assert";
import { fromFileUrl, relative } from "@std/path"; import { fromFileUrl, relative } from "@std/path";
import { pipeline } from "node:stream/promises"; import { pipeline } from "node:stream/promises";
// @ts-expect-error: @types/node is outdated // @ts-expect-error: @types/node is outdated
import { getDefaultHighWaterMark } from "node:stream"; import { getDefaultHighWaterMark, Stream } from "node:stream";
import { createReadStream, createWriteStream } from "node:fs"; import { createReadStream, createWriteStream } from "node:fs";
import { EventEmitter } from "node:events";
Deno.test("stream/promises pipeline", async () => { Deno.test("stream/promises pipeline", async () => {
const filePath = relative( const filePath = relative(
@ -30,3 +31,8 @@ Deno.test("stream getDefaultHighWaterMark", () => {
assertEquals(getDefaultHighWaterMark(false), 16 * 1024); assertEquals(getDefaultHighWaterMark(false), 16 * 1024);
assertEquals(getDefaultHighWaterMark(true), 16); assertEquals(getDefaultHighWaterMark(true), 16);
}); });
Deno.test("stream is an instance of EventEmitter", () => {
const stream = new Stream();
assert(stream instanceof EventEmitter);
});