From 1518fabfbba3cb951357f7c7977d6346943a9a8f Mon Sep 17 00:00:00 2001 From: Nayeem Rahman Date: Fri, 22 Jan 2021 11:45:29 +0000 Subject: [PATCH] fix(std/async): make pooledMap() errors catchable (#9217) --- std/async/pool.ts | 50 ++++++++++++++++++++++++++++++------------ std/async/pool_test.ts | 28 +++++++++++++++++++++-- 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/std/async/pool.ts b/std/async/pool.ts index 8aeb2671d4..0b87f24ac8 100644 --- a/std/async/pool.ts +++ b/std/async/pool.ts @@ -4,8 +4,13 @@ * pooledMap transforms values from an (async) iterable into another async * iterable. The transforms are done concurrently, with a max concurrency * defined by the poolLimit. - * - * @param poolLimit The maximum count of items being processed concurrently. + * + * If an error is thrown from `iterableFn`, no new transformations will begin. + * All currently executing transformations are allowed to finish and still + * yielded on success. After that, the rejections among them are gathered and + * thrown by the iterator in an `AggregateError`. + * + * @param poolLimit The maximum count of items being processed concurrently. * @param array The input array for mapping. * @param iteratorFn The function to call for every item of the array. */ @@ -27,20 +32,37 @@ export function pooledMap( (async (): Promise => { const writer = res.writable.getWriter(); const executing: Array> = []; - for await (const item of array) { - const p = Promise.resolve().then(() => iteratorFn(item)); - writer.write(p); - const e: Promise = p.then(() => - executing.splice(executing.indexOf(e), 1) - ); - executing.push(e); - if (executing.length >= poolLimit) { - await Promise.race(executing); + try { + for await (const item of array) { + const p = Promise.resolve().then(() => iteratorFn(item)); + // Only write on success. If we `writer.write()` a rejected promise, + // that will end the iteration. We don't want that yet. Instead let it + // fail the race, taking us to the catch block where all currently + // executing jobs are allowed to finish and all rejections among them + // can be reported together. + p.then((v) => writer.write(Promise.resolve(v))).catch(() => {}); + const e: Promise = p.then(() => + executing.splice(executing.indexOf(e), 1) + ); + executing.push(e); + if (executing.length >= poolLimit) { + await Promise.race(executing); + } } + // Wait until all ongoing events have processed, then close the writer. + await Promise.all(executing); + writer.close(); + } catch { + const errors = []; + for (const result of await Promise.allSettled(executing)) { + if (result.status == "rejected") { + errors.push(result.reason); + } + } + writer.write(Promise.reject( + new AggregateError(errors, "Threw while mapping."), + )).catch(() => {}); } - // Wait until all ongoing events have processed, then close the writer. - await Promise.all(executing); - writer.close(); })(); return res.readable[Symbol.asyncIterator](); } diff --git a/std/async/pool_test.ts b/std/async/pool_test.ts index 856f4cc0a4..81be903ed6 100644 --- a/std/async/pool_test.ts +++ b/std/async/pool_test.ts @@ -1,6 +1,12 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +import { delay } from "./delay.ts"; import { pooledMap } from "./pool.ts"; -import { assert } from "../testing/asserts.ts"; +import { + assert, + assertEquals, + assertStringIncludes, + assertThrowsAsync, +} from "../testing/asserts.ts"; Deno.test("[async] pooledMap", async function (): Promise { const start = new Date(); @@ -17,4 +23,22 @@ Deno.test("[async] pooledMap", async function (): Promise { assert(diff < 3000); }); -export {}; +Deno.test("[async] pooledMap errors", async function (): Promise { + async function mapNumber(n: number): Promise { + if (n <= 2) { + throw new Error(`Bad number: ${n}`); + } + await delay(100); + return n; + } + const mappedNumbers: number[] = []; + const error = await assertThrowsAsync(async () => { + for await (const m of pooledMap(3, [1, 2, 3, 4], mapNumber)) { + mappedNumbers.push(m); + } + }, AggregateError) as AggregateError; + assertEquals(mappedNumbers, [3]); + assertEquals(error.errors.length, 2); + assertStringIncludes(error.errors[0].stack, "Error: Bad number: 1"); + assertStringIncludes(error.errors[1].stack, "Error: Bad number: 2"); +});