Skip to content

Commit

Permalink
Add support for AsyncIterable as input (#46)
Browse files Browse the repository at this point in the history
Co-authored-by: Sindre Sorhus <sindresorhus@gmail.com>
  • Loading branch information
huntharo and sindresorhus committed Sep 24, 2021
1 parent 5ee5d93 commit a24e909
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 37 deletions.
4 changes: 2 additions & 2 deletions index.d.ts
Expand Up @@ -28,7 +28,7 @@ export type Mapper<Element = any, NewElement = unknown> = (
) => NewElement | Promise<NewElement>;

/**
@param input - Iterated over concurrently in the `mapper` function.
@param input - Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream.
@param mapper - Function which is called for every item in `input`. Expected to return a `Promise` or value.
@returns A `Promise` that is fulfilled when all promises in `input` and ones returned from `mapper` are fulfilled, or rejects if any of the promises reject. The fulfilled value is an `Array` of the fulfilled values returned from `mapper` in `input` order.
Expand All @@ -55,7 +55,7 @@ console.log(result);
```
*/
export default function pMap<Element, NewElement>(
input: Iterable<Element>,
input: AsyncIterable<Element | Promise<Element>> | Iterable<Element | Promise<Element>>,
mapper: Mapper<Element, NewElement>,
options?: Options
): Promise<Array<Exclude<NewElement, typeof pMapSkip>>>;
Expand Down
71 changes: 46 additions & 25 deletions index.js
Expand Up @@ -9,6 +9,10 @@ export default async function pMap(
} = {}
) {
return new Promise((resolve, reject_) => { // eslint-disable-line promise/param-names
if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) {
throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`);
}

if (typeof mapper !== 'function') {
throw new TypeError('Mapper function is required');
}
Expand All @@ -20,33 +24,44 @@ export default async function pMap(
const result = [];
const errors = [];
const skippedIndexes = [];
const iterator = iterable[Symbol.iterator]();
let isRejected = false;
let isResolved = false;
let isIterableDone = false;
let resolvingCount = 0;
let currentIndex = 0;
const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator]();

const reject = reason => {
isRejected = true;
isResolved = true;
reject_(reason);
};

const next = () => {
if (isRejected) {
const next = async () => {
if (isResolved) {
return;
}

const nextItem = iterator.next();
const nextItem = await iterator.next();

const index = currentIndex;
currentIndex++;

// Note: `iterator.next()` can be called many times in parallel.
// This can cause multiple calls to this `next()` function to
// receive a `nextItem` with `done === true`.
// The shutdown logic that rejects/resolves must be protected
// so it runs only one time as the `skippedIndex` logic is
// non-idempotent.
if (nextItem.done) {
isIterableDone = true;

if (resolvingCount === 0) {
if (resolvingCount === 0 && !isResolved) {
if (!stopOnError && errors.length > 0) {
reject(new AggregateError(errors));
} else {
isResolved = true;

for (const skippedIndex of skippedIndexes) {
result.splice(skippedIndex, 1);
}
Expand All @@ -60,36 +75,38 @@ export default async function pMap(

resolvingCount++;

// Intentionally detached
(async () => {
try {
const element = await nextItem.value;

if (isRejected) {
if (isResolved) {
return;
}

const value = await mapper(element, index);

if (value === pMapSkip) {
skippedIndexes.push(index);
} else {
result[index] = value;
}

resolvingCount--;
next();
await next();
} catch (error) {
if (stopOnError) {
reject(error);
} else {
errors.push(error);
resolvingCount--;

// In that case we can't really continue regardless of stopOnError state
// In that case we can't really continue regardless of `stopOnError` state
// since an iterable is likely to continue throwing after it throws once.
// If we continue calling next() indefinitely we will likely end up
// If we continue calling `next()` indefinitely we will likely end up
// in an infinite loop of failed iteration.
try {
next();
await next();
} catch (error) {
reject(error);
}
Expand All @@ -98,23 +115,27 @@ export default async function pMap(
})();
};

for (let index = 0; index < concurrency; index++) {
// Catch errors from the iterable.next() call
// In that case we can't really continue regardless of stopOnError state
// since an iterable is likely to continue throwing after it throws once.
// If we continue calling next() indefinitely we will likely end up
// in an infinite loop of failed iteration.
try {
next();
} catch (error) {
reject(error);
break;
}
// Create the concurrent runners in a detached (non-awaited)
// promise. We need this so we can await the `next()` calls
// to stop creating runners before hitting the concurrency limit
// if the iterable has already been marked as done.
// NOTE: We *must* do this for async iterators otherwise we'll spin up
// infinite `next()` calls by default and never start the event loop.
(async () => {
for (let index = 0; index < concurrency; index++) {
try {
// eslint-disable-next-line no-await-in-loop
await next();
} catch (error) {
reject(error);
break;
}

if (isIterableDone || isRejected) {
break;
if (isIterableDone || isRejected) {
break;
}
}
}
})();
});
}

Expand Down
6 changes: 4 additions & 2 deletions readme.md
Expand Up @@ -43,9 +43,11 @@ Returns a `Promise` that is fulfilled when all promises in `input` and ones retu

#### input

Type: `Iterable<Promise | unknown>`
Type: `AsyncIterable<Promise<unknown> | unknown> | Iterable<Promise<unknown> | unknown>`

Iterated over concurrently in the `mapper` function.
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item.

Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream.

#### mapper(element, index)

Expand Down
169 changes: 161 additions & 8 deletions test.js
Expand Up @@ -7,23 +7,27 @@ import AggregateError from 'aggregate-error';
import pMap, {pMapSkip} from './index.js';

const sharedInput = [
Promise.resolve([10, 300]),
[async () => 10, 300],
[20, 200],
[30, 100]
];

const errorInput1 = [
[20, 200],
[30, 100],
[() => Promise.reject(new Error('foo')), 10],
[async () => {
throw new Error('foo');
}, 10],
[() => {
throw new Error('bar');
}, 10]
];

const errorInput2 = [
[20, 200],
[() => Promise.reject(new Error('bar')), 10],
[async () => {
throw new Error('bar');
}, 10],
[30, 100],
[() => {
throw new Error('foo');
Expand Down Expand Up @@ -151,21 +155,139 @@ test('pMapSkip', async t => {
], async value => value), [1, 2]);
});

test('do not run mapping after stop-on-error happened', async t => {
const input = [1, delay(300, {value: 2}), 3];
test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
await t.throwsAsync(
pMap(input, async value => {
value = typeof value === 'function' ? await value() : value;
mappedValues.push(value);
if (value === 1) {
await delay(100);
throw new Error('Oops!');
}
},
{concurrency: 1})
})
);
await delay(500);
t.deepEqual(mappedValues, [1]);
t.deepEqual(mappedValues, [1, 3, 2]);
});

class AsyncTestData {
constructor(data) {
this.data = data;
}

async * [Symbol.asyncIterator]() {
for (let index = 0; index < this.data.length; index++) {
// Add a delay between each iterated item
// eslint-disable-next-line no-await-in-loop
await delay(10);
yield this.data[index];
}
}
}

//
// Async Iterator tests
//

test('asyncIterator - main', async t => {
const end = timeSpan();
t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper), [10, 20, 30]);

// We give it some leeway on both sides of the expected 300ms as the exact value depends on the machine and workload.
t.true(inRange(end(), {start: 290, end: 430}));
});

test('asyncIterator - concurrency: 1', async t => {
const end = timeSpan();
t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1}), [10, 20, 30]);
t.true(inRange(end(), {start: 590, end: 760}));
});

test('asyncIterator - concurrency: 4', async t => {
const concurrency = 4;
let running = 0;

await pMap(new AsyncTestData(Array.from({length: 100}).fill(0)), async () => {
running++;
t.true(running <= concurrency);
await delay(randomInt(30, 200));
running--;
}, {concurrency});
});

test('asyncIterator - handles empty iterable', async t => {
t.deepEqual(await pMap(new AsyncTestData([]), mapper), []);
});

test('asyncIterator - async with concurrency: 2 (random time sequence)', async t => {
const input = Array.from({length: 10}).map(() => randomInt(0, 100));
const mapper = value => delay(value, {value});
const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2});
t.deepEqual(result, input);
});

test('asyncIterator - async with concurrency: 2 (problematic time sequence)', async t => {
const input = [100, 200, 10, 36, 13, 45];
const mapper = value => delay(value, {value});
const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2});
t.deepEqual(result, input);
});

test('asyncIterator - async with concurrency: 2 (out of order time sequence)', async t => {
const input = [200, 100, 50];
const mapper = value => delay(value, {value});
const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2});
t.deepEqual(result, input);
});

test('asyncIterator - enforce number in options.concurrency', async t => {
await t.throwsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 0}), {instanceOf: TypeError});
await t.throwsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 1.5}), {instanceOf: TypeError});
await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 1}));
await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 10}));
await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: Number.POSITIVE_INFINITY}));
});

test('asyncIterator - immediately rejects when stopOnError is true', async t => {
await t.throwsAsync(pMap(new AsyncTestData(errorInput1), mapper, {concurrency: 1}), {message: 'foo'});
await t.throwsAsync(pMap(new AsyncTestData(errorInput2), mapper, {concurrency: 1}), {message: 'bar'});
});

test('asyncIterator - aggregate errors when stopOnError is false', async t => {
await t.notThrowsAsync(pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1, stopOnError: false}));
await t.throwsAsync(pMap(new AsyncTestData(errorInput1), mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /foo(.|\n)*bar/});
await t.throwsAsync(pMap(new AsyncTestData(errorInput2), mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /bar(.|\n)*foo/});
});

test('asyncIterator - pMapSkip', async t => {
t.deepEqual(await pMap(new AsyncTestData([
1,
pMapSkip,
2
]), async value => value), [1, 2]);
});

test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
await t.throwsAsync(
pMap(new AsyncTestData(input), async value => {
if (typeof value === 'function') {
value = await value();
}

mappedValues.push(value);
if (value === 1) {
await delay(100);
throw new Error(`Oops! ${value}`);
}
}),
{message: 'Oops! 1'}
);
await delay(500);
t.deepEqual(mappedValues, [1, 3, 2]);
});

test('catches exception from source iterator - 1st item', async t => {
Expand Down Expand Up @@ -225,3 +347,34 @@ test('catches exception from source iterator - 2nd item after 1st item mapper th
t.is(input.index, 2);
t.deepEqual(mappedValues, [0]);
});

test('asyncIterator - get the correct exception after stop-on-error', async t => {
const input = [1, async () => delay(200, {value: 2}), async () => delay(300, {value: 3})];
const mappedValues = [];

const task = pMap(new AsyncTestData(input), async value => {
if (typeof value === 'function') {
value = await value();
}

mappedValues.push(value);
// Throw for each item - all should fail and we should get only the first
await delay(100);
throw new Error(`Oops! ${value}`);
});
await delay(500);
await t.throwsAsync(task, {message: 'Oops! 1'});
t.deepEqual(mappedValues, [1, 2, 3]);
});

test('incorrect input type', async t => {
let mapperCalled = false;

const task = pMap(123456, async () => {
mapperCalled = true;
await delay(100);
});
await delay(500);
await t.throwsAsync(task, {message: 'Expected `input` to be either an `Iterable` or `AsyncIterable`, got (number)'});
t.false(mapperCalled);
});

0 comments on commit a24e909

Please sign in to comment.