From a24e9094b5a0951c9d665bda432787457d4f7989 Mon Sep 17 00:00:00 2001 From: Harold Hunt Date: Thu, 23 Sep 2021 22:22:14 -0400 Subject: [PATCH] Add support for AsyncIterable as input (#46) Co-authored-by: Sindre Sorhus --- index.d.ts | 4 +- index.js | 71 ++++++++++++++-------- readme.md | 6 +- test.js | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 213 insertions(+), 37 deletions(-) diff --git a/index.d.ts b/index.d.ts index ebcb3e8..a5423ff 100644 --- a/index.d.ts +++ b/index.d.ts @@ -28,7 +28,7 @@ export type Mapper = ( ) => NewElement | Promise; /** -@param input - Iterated over concurrently in the `mapper` function. +@param input - Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream. @param mapper - Function which is called for every item in `input`. Expected to return a `Promise` or value. @returns A `Promise` that is fulfilled when all promises in `input` and ones returned from `mapper` are fulfilled, or rejects if any of the promises reject. The fulfilled value is an `Array` of the fulfilled values returned from `mapper` in `input` order. @@ -55,7 +55,7 @@ console.log(result); ``` */ export default function pMap( - input: Iterable, + input: AsyncIterable> | Iterable>, mapper: Mapper, options?: Options ): Promise>>; diff --git a/index.js b/index.js index b32edb4..c7c9fba 100644 --- a/index.js +++ b/index.js @@ -9,6 +9,10 @@ export default async function pMap( } = {} ) { return new Promise((resolve, reject_) => { // eslint-disable-line promise/param-names + if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) { + throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`); + } + if (typeof mapper !== 'function') { throw new TypeError('Mapper function is required'); } @@ -20,33 +24,44 @@ export default async function pMap( const result = []; const errors = []; const skippedIndexes = []; - const iterator = iterable[Symbol.iterator](); let isRejected = false; + let isResolved = false; let isIterableDone = false; let resolvingCount = 0; let currentIndex = 0; + const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator](); const reject = reason => { isRejected = true; + isResolved = true; reject_(reason); }; - const next = () => { - if (isRejected) { + const next = async () => { + if (isResolved) { return; } - const nextItem = iterator.next(); + const nextItem = await iterator.next(); + const index = currentIndex; currentIndex++; + // Note: `iterator.next()` can be called many times in parallel. + // This can cause multiple calls to this `next()` function to + // receive a `nextItem` with `done === true`. + // The shutdown logic that rejects/resolves must be protected + // so it runs only one time as the `skippedIndex` logic is + // non-idempotent. if (nextItem.done) { isIterableDone = true; - if (resolvingCount === 0) { + if (resolvingCount === 0 && !isResolved) { if (!stopOnError && errors.length > 0) { reject(new AggregateError(errors)); } else { + isResolved = true; + for (const skippedIndex of skippedIndexes) { result.splice(skippedIndex, 1); } @@ -60,15 +75,17 @@ export default async function pMap( resolvingCount++; + // Intentionally detached (async () => { try { const element = await nextItem.value; - if (isRejected) { + if (isResolved) { return; } const value = await mapper(element, index); + if (value === pMapSkip) { skippedIndexes.push(index); } else { @@ -76,7 +93,7 @@ export default async function pMap( } resolvingCount--; - next(); + await next(); } catch (error) { if (stopOnError) { reject(error); @@ -84,12 +101,12 @@ export default async function pMap( errors.push(error); resolvingCount--; - // In that case we can't really continue regardless of stopOnError state + // In that case we can't really continue regardless of `stopOnError` state // since an iterable is likely to continue throwing after it throws once. - // If we continue calling next() indefinitely we will likely end up + // If we continue calling `next()` indefinitely we will likely end up // in an infinite loop of failed iteration. try { - next(); + await next(); } catch (error) { reject(error); } @@ -98,23 +115,27 @@ export default async function pMap( })(); }; - for (let index = 0; index < concurrency; index++) { - // Catch errors from the iterable.next() call - // In that case we can't really continue regardless of stopOnError state - // since an iterable is likely to continue throwing after it throws once. - // If we continue calling next() indefinitely we will likely end up - // in an infinite loop of failed iteration. - try { - next(); - } catch (error) { - reject(error); - break; - } + // Create the concurrent runners in a detached (non-awaited) + // promise. We need this so we can await the `next()` calls + // to stop creating runners before hitting the concurrency limit + // if the iterable has already been marked as done. + // NOTE: We *must* do this for async iterators otherwise we'll spin up + // infinite `next()` calls by default and never start the event loop. + (async () => { + for (let index = 0; index < concurrency; index++) { + try { + // eslint-disable-next-line no-await-in-loop + await next(); + } catch (error) { + reject(error); + break; + } - if (isIterableDone || isRejected) { - break; + if (isIterableDone || isRejected) { + break; + } } - } + })(); }); } diff --git a/readme.md b/readme.md index 1a08047..7c8ba3a 100644 --- a/readme.md +++ b/readme.md @@ -43,9 +43,11 @@ Returns a `Promise` that is fulfilled when all promises in `input` and ones retu #### input -Type: `Iterable` +Type: `AsyncIterable | unknown> | Iterable | unknown>` -Iterated over concurrently in the `mapper` function. +Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. + +Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream. #### mapper(element, index) diff --git a/test.js b/test.js index fdcb7c5..2120258 100644 --- a/test.js +++ b/test.js @@ -7,7 +7,7 @@ import AggregateError from 'aggregate-error'; import pMap, {pMapSkip} from './index.js'; const sharedInput = [ - Promise.resolve([10, 300]), + [async () => 10, 300], [20, 200], [30, 100] ]; @@ -15,7 +15,9 @@ const sharedInput = [ const errorInput1 = [ [20, 200], [30, 100], - [() => Promise.reject(new Error('foo')), 10], + [async () => { + throw new Error('foo'); + }, 10], [() => { throw new Error('bar'); }, 10] @@ -23,7 +25,9 @@ const errorInput1 = [ const errorInput2 = [ [20, 200], - [() => Promise.reject(new Error('bar')), 10], + [async () => { + throw new Error('bar'); + }, 10], [30, 100], [() => { throw new Error('foo'); @@ -151,21 +155,139 @@ test('pMapSkip', async t => { ], async value => value), [1, 2]); }); -test('do not run mapping after stop-on-error happened', async t => { - const input = [1, delay(300, {value: 2}), 3]; +test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => { + const input = [1, async () => delay(300, {value: 2}), 3]; const mappedValues = []; await t.throwsAsync( pMap(input, async value => { + value = typeof value === 'function' ? await value() : value; mappedValues.push(value); if (value === 1) { await delay(100); throw new Error('Oops!'); } - }, - {concurrency: 1}) + }) ); await delay(500); - t.deepEqual(mappedValues, [1]); + t.deepEqual(mappedValues, [1, 3, 2]); +}); + +class AsyncTestData { + constructor(data) { + this.data = data; + } + + async * [Symbol.asyncIterator]() { + for (let index = 0; index < this.data.length; index++) { + // Add a delay between each iterated item + // eslint-disable-next-line no-await-in-loop + await delay(10); + yield this.data[index]; + } + } +} + +// +// Async Iterator tests +// + +test('asyncIterator - main', async t => { + const end = timeSpan(); + t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper), [10, 20, 30]); + + // We give it some leeway on both sides of the expected 300ms as the exact value depends on the machine and workload. + t.true(inRange(end(), {start: 290, end: 430})); +}); + +test('asyncIterator - concurrency: 1', async t => { + const end = timeSpan(); + t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1}), [10, 20, 30]); + t.true(inRange(end(), {start: 590, end: 760})); +}); + +test('asyncIterator - concurrency: 4', async t => { + const concurrency = 4; + let running = 0; + + await pMap(new AsyncTestData(Array.from({length: 100}).fill(0)), async () => { + running++; + t.true(running <= concurrency); + await delay(randomInt(30, 200)); + running--; + }, {concurrency}); +}); + +test('asyncIterator - handles empty iterable', async t => { + t.deepEqual(await pMap(new AsyncTestData([]), mapper), []); +}); + +test('asyncIterator - async with concurrency: 2 (random time sequence)', async t => { + const input = Array.from({length: 10}).map(() => randomInt(0, 100)); + const mapper = value => delay(value, {value}); + const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2}); + t.deepEqual(result, input); +}); + +test('asyncIterator - async with concurrency: 2 (problematic time sequence)', async t => { + const input = [100, 200, 10, 36, 13, 45]; + const mapper = value => delay(value, {value}); + const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2}); + t.deepEqual(result, input); +}); + +test('asyncIterator - async with concurrency: 2 (out of order time sequence)', async t => { + const input = [200, 100, 50]; + const mapper = value => delay(value, {value}); + const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2}); + t.deepEqual(result, input); +}); + +test('asyncIterator - enforce number in options.concurrency', async t => { + await t.throwsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 0}), {instanceOf: TypeError}); + await t.throwsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 1.5}), {instanceOf: TypeError}); + await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 1})); + await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 10})); + await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: Number.POSITIVE_INFINITY})); +}); + +test('asyncIterator - immediately rejects when stopOnError is true', async t => { + await t.throwsAsync(pMap(new AsyncTestData(errorInput1), mapper, {concurrency: 1}), {message: 'foo'}); + await t.throwsAsync(pMap(new AsyncTestData(errorInput2), mapper, {concurrency: 1}), {message: 'bar'}); +}); + +test('asyncIterator - aggregate errors when stopOnError is false', async t => { + await t.notThrowsAsync(pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1, stopOnError: false})); + await t.throwsAsync(pMap(new AsyncTestData(errorInput1), mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /foo(.|\n)*bar/}); + await t.throwsAsync(pMap(new AsyncTestData(errorInput2), mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /bar(.|\n)*foo/}); +}); + +test('asyncIterator - pMapSkip', async t => { + t.deepEqual(await pMap(new AsyncTestData([ + 1, + pMapSkip, + 2 + ]), async value => value), [1, 2]); +}); + +test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => { + const input = [1, async () => delay(300, {value: 2}), 3]; + const mappedValues = []; + await t.throwsAsync( + pMap(new AsyncTestData(input), async value => { + if (typeof value === 'function') { + value = await value(); + } + + mappedValues.push(value); + if (value === 1) { + await delay(100); + throw new Error(`Oops! ${value}`); + } + }), + {message: 'Oops! 1'} + ); + await delay(500); + t.deepEqual(mappedValues, [1, 3, 2]); }); test('catches exception from source iterator - 1st item', async t => { @@ -225,3 +347,34 @@ test('catches exception from source iterator - 2nd item after 1st item mapper th t.is(input.index, 2); t.deepEqual(mappedValues, [0]); }); + +test('asyncIterator - get the correct exception after stop-on-error', async t => { + const input = [1, async () => delay(200, {value: 2}), async () => delay(300, {value: 3})]; + const mappedValues = []; + + const task = pMap(new AsyncTestData(input), async value => { + if (typeof value === 'function') { + value = await value(); + } + + mappedValues.push(value); + // Throw for each item - all should fail and we should get only the first + await delay(100); + throw new Error(`Oops! ${value}`); + }); + await delay(500); + await t.throwsAsync(task, {message: 'Oops! 1'}); + t.deepEqual(mappedValues, [1, 2, 3]); +}); + +test('incorrect input type', async t => { + let mapperCalled = false; + + const task = pMap(123456, async () => { + mapperCalled = true; + await delay(100); + }); + await delay(500); + await t.throwsAsync(task, {message: 'Expected `input` to be either an `Iterable` or `AsyncIterable`, got (number)'}); + t.false(mapperCalled); +});