1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-10 16:11:13 -05:00

feat: Asynchronous event iteration node polyfill (#4016)

This commit is contained in:
Chris Knight 2020-02-17 18:22:41 +00:00 committed by GitHub
parent 95563476f6
commit 7b9f6e9c45
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 299 additions and 1 deletions

View file

@ -393,3 +393,121 @@ export function once(
}
});
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function createIterResult(value: any, done: boolean): IteratorResult<any> {
return { value, done };
}
interface AsyncInterable {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
next(): Promise<IteratorResult<any, any>>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return(): Promise<IteratorResult<any, any>>;
throw(err: Error): void;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[Symbol.asyncIterator](): any;
}
/**
* Returns an AsyncIterator that iterates eventName events. It will throw if
* the EventEmitter emits 'error'. It removes all listeners when exiting the
* loop. The value returned by each iteration is an array composed of the
* emitted event arguments.
*/
export function on(
emitter: EventEmitter,
event: string | symbol
): AsyncInterable {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const unconsumedEventValues: any[] = [];
const unconsumedPromises = [];
let error = null;
let finished = false;
const iterator = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
next(): Promise<IteratorResult<any>> {
// First, we consume all unread events
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const value: any = unconsumedEventValues.shift();
if (value) {
return Promise.resolve(createIterResult(value, false));
}
// Then we error, if an error happened
// This happens one time if at all, because after 'error'
// we stop listening
if (error) {
const p: Promise<never> = Promise.reject(error);
// Only the first element errors
error = null;
return p;
}
// If the iterator is finished, resolve to done
if (finished) {
return Promise.resolve(createIterResult(undefined, true));
}
// Wait until an event happens
return new Promise(function(resolve, reject) {
unconsumedPromises.push({ resolve, reject });
});
},
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return(): Promise<IteratorResult<any>> {
emitter.removeListener(event, eventHandler);
emitter.removeListener("error", errorHandler);
finished = true;
for (const promise of unconsumedPromises) {
promise.resolve(createIterResult(undefined, true));
}
return Promise.resolve(createIterResult(undefined, true));
},
throw(err: Error): void {
error = err;
emitter.removeListener(event, eventHandler);
emitter.removeListener("error", errorHandler);
},
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[Symbol.asyncIterator](): AsyncIterable<any> {
return this;
}
};
emitter.on(event, eventHandler);
emitter.on("error", errorHandler);
return iterator;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function eventHandler(...args: any[]): void {
const promise = unconsumedPromises.shift();
if (promise) {
promise.resolve(createIterResult(args, false));
} else {
unconsumedEventValues.push(args);
}
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function errorHandler(err: any): void {
finished = true;
const toError = unconsumedPromises.shift();
if (toError) {
toError.reject(err);
} else {
// The next time we call next()
error = err;
}
iterator.return();
}
}

View file

@ -5,7 +5,7 @@ import {
fail,
assertThrows
} from "../testing/asserts.ts";
import EventEmitter, { WrappedFunction, once } from "./events.ts";
import EventEmitter, { WrappedFunction, once, on } from "./events.ts";
const shouldNeverBeEmitted: Function = () => {
fail("Should never be called");
@ -439,3 +439,183 @@ test({
assertEquals(events, ["errorMonitor event", "error"]);
}
});
test({
name: "asyncronous iteration of events are handled as expected",
async fn() {
const ee = new EventEmitter();
setTimeout(() => {
ee.emit("foo", "bar");
ee.emit("bar", 24);
ee.emit("foo", 42);
}, 0);
const iterable = on(ee, "foo");
const expected = [["bar"], [42]];
for await (const event of iterable) {
const current = expected.shift();
assertEquals(current, event);
if (expected.length === 0) {
break;
}
}
assertEquals(ee.listenerCount("foo"), 0);
assertEquals(ee.listenerCount("error"), 0);
}
});
test({
name: "asyncronous error handling of emitted events works as expected",
async fn() {
const ee = new EventEmitter();
const _err = new Error("kaboom");
setTimeout(() => {
ee.emit("error", _err);
}, 0);
const iterable = on(ee, "foo");
let thrown = false;
try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const event of iterable) {
fail("no events should be processed due to the error thrown");
}
} catch (err) {
thrown = true;
assertEquals(err, _err);
}
assertEquals(thrown, true);
}
});
test({
name: "error thrown during asyncronous processing of events is handled",
async fn() {
const ee = new EventEmitter();
const _err = new Error("kaboom");
setTimeout(() => {
ee.emit("foo", 42);
ee.emit("error", _err);
}, 0);
const iterable = on(ee, "foo");
const expected = [[42]];
let thrown = false;
try {
for await (const event of iterable) {
const current = expected.shift();
assertEquals(current, event);
}
} catch (err) {
thrown = true;
assertEquals(err, _err);
}
assertEquals(thrown, true);
assertEquals(ee.listenerCount("foo"), 0);
assertEquals(ee.listenerCount("error"), 0);
}
});
test({
name:
"error thrown in processing loop of asyncronous event prevents processing of additional events",
async fn() {
const ee = new EventEmitter();
const _err = new Error("kaboom");
setTimeout(() => {
ee.emit("foo", 42);
ee.emit("foo", 999);
}, 0);
try {
for await (const event of on(ee, "foo")) {
assertEquals(event, [42]);
throw _err;
}
} catch (err) {
assertEquals(err, _err);
}
assertEquals(ee.listenerCount("foo"), 0);
assertEquals(ee.listenerCount("error"), 0);
}
});
test({
name: "asyncronous iterator next() works as expected",
async fn() {
const ee = new EventEmitter();
const iterable = on(ee, "foo");
setTimeout(function() {
ee.emit("foo", "bar");
ee.emit("foo", 42);
iterable.return();
}, 0);
const results = await Promise.all([
iterable.next(),
iterable.next(),
iterable.next()
]);
assertEquals(results, [
{
value: ["bar"],
done: false
},
{
value: [42],
done: false
},
{
value: undefined,
done: true
}
]);
assertEquals(await iterable.next(), {
value: undefined,
done: true
});
}
});
test({
name: "async iterable throw handles various scenarios",
async fn() {
const ee = new EventEmitter();
const iterable = on(ee, "foo");
setTimeout(() => {
ee.emit("foo", "bar");
ee.emit("foo", 42); // lost in the queue
iterable.throw(_err);
}, 0);
const _err = new Error("kaboom");
let thrown = false;
const expected = [["bar"], [42]];
try {
for await (const event of iterable) {
assertEquals(event, expected.shift());
}
} catch (err) {
thrown = true;
assertEquals(err, _err);
}
assertEquals(thrown, true);
assertEquals(expected.length, 0);
assertEquals(ee.listenerCount("foo"), 0);
assertEquals(ee.listenerCount("error"), 0);
}
});