Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AsyncIterable as input #46

Merged
merged 6 commits into from Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions index.d.ts
Expand Up @@ -28,7 +28,7 @@ export type Mapper<Element = any, NewElement = unknown> = (
) => NewElement | Promise<NewElement>;

/**
@param input - Iterated over concurrently in the `mapper` function.
@param input - Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream.
@param mapper - Function which is called for every item in `input`. Expected to return a `Promise` or value.
@returns A `Promise` that is fulfilled when all promises in `input` and ones returned from `mapper` are fulfilled, or rejects if any of the promises reject. The fulfilled value is an `Array` of the fulfilled values returned from `mapper` in `input` order.

Expand All @@ -55,7 +55,7 @@ console.log(result);
```
*/
export default function pMap<Element, NewElement>(
input: Iterable<Element>,
input: AsyncIterable<Element | Promise<Element>> | Iterable<Element | Promise<Element>>,
mapper: Mapper<Element, NewElement>,
options?: Options
): Promise<Array<Exclude<NewElement, typeof pMapSkip>>>;
Expand Down
71 changes: 46 additions & 25 deletions index.js
Expand Up @@ -9,6 +9,10 @@ export default async function pMap(
} = {}
) {
return new Promise((resolve, reject_) => { // eslint-disable-line promise/param-names
if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) {
throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`);
}

if (typeof mapper !== 'function') {
throw new TypeError('Mapper function is required');
}
Expand All @@ -20,33 +24,44 @@ export default async function pMap(
const result = [];
const errors = [];
const skippedIndexes = [];
const iterator = iterable[Symbol.iterator]();
let isRejected = false;
let isResolved = false;
let isIterableDone = false;
let resolvingCount = 0;
let currentIndex = 0;
const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator]();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should throw a TypeError with a human-friendly error if the input is neither an iterable or async iterable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think?

Inner function within the function so iterator can remain const? File scope function?

Nested ? operator?

if/else if/else with throw and iterator being non-const?

Also... when you say "TypeError with a human-friendly error"... did you mean a human friendly error-message? Or some specific field you want set on the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done... i added a check for the iterator type at the top, similar to mapper / concurrency checks, which means that the existing code cannot have an error as it would have already thrown out


const reject = reason => {
isRejected = true;
isResolved = true;
reject_(reason);
};

const next = () => {
if (isRejected) {
const next = async () => {
if (isResolved) {
return;
}

const nextItem = iterator.next();
const nextItem = await iterator.next();

const index = currentIndex;
currentIndex++;

// Note: `iterator.next()` can be called many times in parallel.
// This can cause multiple calls to this `next()` function to
// receive a `nextItem` with `done === true`.
// The shutdown logic that rejects/resolves must be protected
// so it runs only one time as the `skippedIndex` logic is
// non-idempotent.
if (nextItem.done) {
isIterableDone = true;

if (resolvingCount === 0) {
if (resolvingCount === 0 && !isResolved) {
if (!stopOnError && errors.length > 0) {
reject(new AggregateError(errors));
} else {
isResolved = true;

for (const skippedIndex of skippedIndexes) {
result.splice(skippedIndex, 1);
}
Expand All @@ -60,36 +75,38 @@ export default async function pMap(

resolvingCount++;

// Intentionally detached
(async () => {
try {
const element = await nextItem.value;

if (isRejected) {
if (isResolved) {
return;
}

const value = await mapper(element, index);

if (value === pMapSkip) {
skippedIndexes.push(index);
} else {
result[index] = value;
}

resolvingCount--;
next();
await next();
} catch (error) {
if (stopOnError) {
reject(error);
} else {
errors.push(error);
resolvingCount--;

// In that case we can't really continue regardless of stopOnError state
// In that case we can't really continue regardless of `stopOnError` state
// since an iterable is likely to continue throwing after it throws once.
// If we continue calling next() indefinitely we will likely end up
// If we continue calling `next()` indefinitely we will likely end up
// in an infinite loop of failed iteration.
try {
next();
await next();
} catch (error) {
reject(error);
}
Expand All @@ -98,23 +115,27 @@ export default async function pMap(
})();
};

for (let index = 0; index < concurrency; index++) {
// Catch errors from the iterable.next() call
// In that case we can't really continue regardless of stopOnError state
// since an iterable is likely to continue throwing after it throws once.
// If we continue calling next() indefinitely we will likely end up
// in an infinite loop of failed iteration.
try {
next();
} catch (error) {
reject(error);
break;
}
// Create the concurrent runners in a detached (non-awaited)
// promise. We need this so we can await the `next()` calls
// to stop creating runners before hitting the concurrency limit
// if the iterable has already been marked as done.
// NOTE: We *must* do this for async iterators otherwise we'll spin up
// infinite `next()` calls by default and never start the event loop.
(async () => {
for (let index = 0; index < concurrency; index++) {
try {
// eslint-disable-next-line no-await-in-loop
await next();
} catch (error) {
reject(error);
break;
}

if (isIterableDone || isRejected) {
break;
if (isIterableDone || isRejected) {
break;
}
}
}
})();
});
}

Expand Down
6 changes: 4 additions & 2 deletions readme.md
Expand Up @@ -43,9 +43,11 @@ Returns a `Promise` that is fulfilled when all promises in `input` and ones retu

#### input

Type: `Iterable<Promise | unknown>`
Type: `AsyncIterable<Promise<unknown> | unknown> | Iterable<Promise<unknown> | unknown>`

Iterated over concurrently in the `mapper` function.
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item.

Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to update index.d.ts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

#### mapper(element, index)

Expand Down
169 changes: 161 additions & 8 deletions test.js
Expand Up @@ -7,23 +7,27 @@ import AggregateError from 'aggregate-error';
import pMap, {pMapSkip} from './index.js';

const sharedInput = [
Promise.resolve([10, 300]),
[async () => 10, 300],
[20, 200],
[30, 100]
];

const errorInput1 = [
[20, 200],
[30, 100],
[() => Promise.reject(new Error('foo')), 10],
[async () => {
throw new Error('foo');
}, 10],
[() => {
throw new Error('bar');
}, 10]
];

const errorInput2 = [
[20, 200],
[() => Promise.reject(new Error('bar')), 10],
[async () => {
throw new Error('bar');
}, 10],
[30, 100],
[() => {
throw new Error('foo');
Expand Down Expand Up @@ -151,21 +155,139 @@ test('pMapSkip', async t => {
], async value => value), [1, 2]);
});

test('do not run mapping after stop-on-error happened', async t => {
const input = [1, delay(300, {value: 2}), 3];
test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
await t.throwsAsync(
pMap(input, async value => {
value = typeof value === 'function' ? await value() : value;
mappedValues.push(value);
if (value === 1) {
await delay(100);
throw new Error('Oops!');
}
},
{concurrency: 1})
})
);
await delay(500);
t.deepEqual(mappedValues, [1]);
t.deepEqual(mappedValues, [1, 3, 2]);
});

class AsyncTestData {
constructor(data) {
this.data = data;
}

async * [Symbol.asyncIterator]() {
for (let index = 0; index < this.data.length; index++) {
// Add a delay between each iterated item
// eslint-disable-next-line no-await-in-loop
await delay(10);
yield this.data[index];
}
}
}

//
// Async Iterator tests
//

test('asyncIterator - main', async t => {
const end = timeSpan();
t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper), [10, 20, 30]);

// We give it some leeway on both sides of the expected 300ms as the exact value depends on the machine and workload.
t.true(inRange(end(), {start: 290, end: 430}));
});

test('asyncIterator - concurrency: 1', async t => {
const end = timeSpan();
t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1}), [10, 20, 30]);
t.true(inRange(end(), {start: 590, end: 760}));
});

test('asyncIterator - concurrency: 4', async t => {
const concurrency = 4;
let running = 0;

await pMap(new AsyncTestData(Array.from({length: 100}).fill(0)), async () => {
running++;
t.true(running <= concurrency);
await delay(randomInt(30, 200));
running--;
}, {concurrency});
});

test('asyncIterator - handles empty iterable', async t => {
t.deepEqual(await pMap(new AsyncTestData([]), mapper), []);
});

test('asyncIterator - async with concurrency: 2 (random time sequence)', async t => {
const input = Array.from({length: 10}).map(() => randomInt(0, 100));
const mapper = value => delay(value, {value});
const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2});
t.deepEqual(result, input);
});

test('asyncIterator - async with concurrency: 2 (problematic time sequence)', async t => {
const input = [100, 200, 10, 36, 13, 45];
const mapper = value => delay(value, {value});
const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2});
t.deepEqual(result, input);
});

test('asyncIterator - async with concurrency: 2 (out of order time sequence)', async t => {
const input = [200, 100, 50];
const mapper = value => delay(value, {value});
const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2});
t.deepEqual(result, input);
});

test('asyncIterator - enforce number in options.concurrency', async t => {
await t.throwsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 0}), {instanceOf: TypeError});
await t.throwsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 1.5}), {instanceOf: TypeError});
await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 1}));
await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 10}));
await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: Number.POSITIVE_INFINITY}));
});

test('asyncIterator - immediately rejects when stopOnError is true', async t => {
await t.throwsAsync(pMap(new AsyncTestData(errorInput1), mapper, {concurrency: 1}), {message: 'foo'});
await t.throwsAsync(pMap(new AsyncTestData(errorInput2), mapper, {concurrency: 1}), {message: 'bar'});
});

test('asyncIterator - aggregate errors when stopOnError is false', async t => {
await t.notThrowsAsync(pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1, stopOnError: false}));
await t.throwsAsync(pMap(new AsyncTestData(errorInput1), mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /foo(.|\n)*bar/});
await t.throwsAsync(pMap(new AsyncTestData(errorInput2), mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /bar(.|\n)*foo/});
});

test('asyncIterator - pMapSkip', async t => {
t.deepEqual(await pMap(new AsyncTestData([
1,
pMapSkip,
2
]), async value => value), [1, 2]);
});

test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
await t.throwsAsync(
pMap(new AsyncTestData(input), async value => {
if (typeof value === 'function') {
value = await value();
}

mappedValues.push(value);
if (value === 1) {
await delay(100);
throw new Error(`Oops! ${value}`);
}
}),
{message: 'Oops! 1'}
);
await delay(500);
t.deepEqual(mappedValues, [1, 3, 2]);
});

test('catches exception from source iterator - 1st item', async t => {
Expand Down Expand Up @@ -225,3 +347,34 @@ test('catches exception from source iterator - 2nd item after 1st item mapper th
t.is(input.index, 2);
t.deepEqual(mappedValues, [0]);
});

test('asyncIterator - get the correct exception after stop-on-error', async t => {
const input = [1, async () => delay(200, {value: 2}), async () => delay(300, {value: 3})];
const mappedValues = [];

const task = pMap(new AsyncTestData(input), async value => {
if (typeof value === 'function') {
value = await value();
}

mappedValues.push(value);
// Throw for each item - all should fail and we should get only the first
await delay(100);
throw new Error(`Oops! ${value}`);
});
await delay(500);
await t.throwsAsync(task, {message: 'Oops! 1'});
t.deepEqual(mappedValues, [1, 2, 3]);
});

test('incorrect input type', async t => {
let mapperCalled = false;

const task = pMap(123456, async () => {
mapperCalled = true;
await delay(100);
});
await delay(500);
await t.throwsAsync(task, {message: 'Expected `input` to be either an `Iterable` or `AsyncIterable`, got (number)'});
t.false(mapperCalled);
});