Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AsyncIterable as input #46

Merged
merged 6 commits into from Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion index.d.ts
Expand Up @@ -55,7 +55,7 @@ console.log(result);
```
*/
export default function pMap<Element, NewElement>(
input: Iterable<Element>,
input: AsyncIterable<Element> | Iterable<Element>,
mapper: Mapper<Element, NewElement>,
options?: Options
): Promise<Array<Exclude<NewElement, typeof pMapSkip>>>;
Expand Down
63 changes: 40 additions & 23 deletions index.js
Expand Up @@ -20,33 +20,44 @@ export default async function pMap(
const result = [];
const errors = [];
const skippedIndexes = [];
const iterator = iterable[Symbol.iterator]();
let isRejected = false;
let isRejectedOrResolved = false;
let isIterableDone = false;
let resolvingCount = 0;
let currentIndex = 0;
const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator]();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should throw a TypeError with a human-friendly error if the input is neither an iterable or async iterable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think?

Inner function within the function so iterator can remain const? File scope function?

Nested ? operator?

if/else if/else with throw and iterator being non-const?

Also... when you say "TypeError with a human-friendly error"... did you mean a human friendly error-message? Or some specific field you want set on the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done... i added a check for the iterator type at the top, similar to mapper / concurrency checks, which means that the existing code cannot have an error as it would have already thrown out


const reject = reason => {
isRejected = true;
isRejectedOrResolved = true;
huntharo marked this conversation as resolved.
Show resolved Hide resolved
reject_(reason);
};

const next = () => {
if (isRejected) {
const next = async () => {
if (isRejectedOrResolved) {
return;
}

const nextItem = iterator.next();
const nextItem = await iterator.next();

const index = currentIndex;
currentIndex++;

// Note: iterator.next() can be called many times in parallel.
// This can cause multiple calls to this next() function to
// receive a `nextItem` with `done === true`.
// The shutdown logic that rejects/resolves must be protected
// so it runs only one time as the `skippedIndex` logic is
// non-idempotent.
if (nextItem.done) {
isIterableDone = true;

if (resolvingCount === 0) {
if (resolvingCount === 0 && !isRejectedOrResolved) {
if (!stopOnError && errors.length > 0) {
reject(new AggregateError(errors));
} else {
isRejectedOrResolved = true;

for (const skippedIndex of skippedIndexes) {
result.splice(skippedIndex, 1);
}
Expand All @@ -60,23 +71,25 @@ export default async function pMap(

resolvingCount++;

// Intentionally detached
(async () => {
try {
const element = await nextItem.value;

if (isRejected) {
if (isRejectedOrResolved) {
return;
}

const value = await mapper(element, index);

if (value === pMapSkip) {
skippedIndexes.push(index);
} else {
result[index] = value;
}

resolvingCount--;
next();
await next();
} catch (error) {
if (stopOnError) {
reject(error);
Expand All @@ -89,7 +102,7 @@ export default async function pMap(
// If we continue calling next() indefinitely we will likely end up
// in an infinite loop of failed iteration.
try {
next();
await next();
} catch (error) {
reject(error);
}
Expand All @@ -98,23 +111,27 @@ export default async function pMap(
})();
};

for (let index = 0; index < concurrency; index++) {
// Catch errors from the iterable.next() call
// In that case we can't really continue regardless of stopOnError state
// since an iterable is likely to continue throwing after it throws once.
// If we continue calling next() indefinitely we will likely end up
// in an infinite loop of failed iteration.
try {
next();
} catch (error) {
reject(error);
break;
}
// Create the concurrent runners in a detached (non-awaited)
// promise. We need this so we can await the next() calls
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// promise. We need this so we can await the next() calls
// promise. We need this so we can await the `next()` calls

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// to stop creating runners before hitting the concurrency limit
// if the iterable has already been marked as done.
// NOTE: We *must* do this for async iterators otherwise we'll spin up
// infinite next() calls by default and never start the event loop.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// infinite next() calls by default and never start the event loop.
// infinite `next()` calls by default and never start the event loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done (plus more cases)

(async () => {
for (let index = 0; index < concurrency; index++) {
try {
// eslint-disable-next-line no-await-in-loop
await next();
} catch (error) {
reject(error);
break;
}

if (isIterableDone || isRejected) {
break;
if (isIterableDone || isRejected) {
break;
}
}
}
})();
});
}

Expand Down
159 changes: 151 additions & 8 deletions test.js
Expand Up @@ -7,23 +7,29 @@ import AggregateError from 'aggregate-error';
import pMap, {pMapSkip} from './index.js';

const sharedInput = [
Promise.resolve([10, 300]),
[async () => {
return 10;
}, 300],
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[async () => {
return 10;
}, 300],
[async () => 10, 300],

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

[20, 200],
[30, 100]
];

const errorInput1 = [
[20, 200],
[30, 100],
[() => Promise.reject(new Error('foo')), 10],
[async () => {
throw new Error('foo');
}, 10],
[() => {
throw new Error('bar');
}, 10]
];

const errorInput2 = [
[20, 200],
[() => Promise.reject(new Error('bar')), 10],
[async () => {
throw new Error('bar');
}, 10],
[30, 100],
[() => {
throw new Error('foo');
Expand Down Expand Up @@ -151,21 +157,139 @@ test('pMapSkip', async t => {
], async value => value), [1, 2]);
});

test('do not run mapping after stop-on-error happened', async t => {
const input = [1, delay(300, {value: 2}), 3];
test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
await t.throwsAsync(
pMap(input, async value => {
value = typeof value === 'function' ? await value() : value;
mappedValues.push(value);
if (value === 1) {
await delay(100);
throw new Error('Oops!');
}
},
{concurrency: 1})
})
);
await delay(500);
t.deepEqual(mappedValues, [1]);
t.deepEqual(mappedValues, [1, 3, 2]);
});

class AsyncTestData {
constructor(data) {
this.data = data;
}

async * [Symbol.asyncIterator]() {
for (let i = 0; i < this.data.length; i++) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (let i = 0; i < this.data.length; i++) {
for (let index = 0; index < this.data.length; index++) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Add a delay between each item iterated
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Add a delay between each item iterated
// Add a delay between each iterated item

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// eslint-disable-next-line no-await-in-loop
await delay(10);
yield this.data[i];
}
}
}

//
// Async Iterator tests
//

test('asyncIterator - main', async t => {
const end = timeSpan();
t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper), [10, 20, 30]);

// We give it some leeway on both sides of the expected 300ms as the exact value depends on the machine and workload.
t.true(inRange(end(), {start: 290, end: 430}));
});

test('asyncIterator - concurrency: 1', async t => {
const end = timeSpan();
t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1}), [10, 20, 30]);
t.true(inRange(end(), {start: 590, end: 760}));
});

test('asyncIterator - concurrency: 4', async t => {
const concurrency = 4;
let running = 0;

await pMap(new AsyncTestData(Array.from({length: 100}).fill(0)), async () => {
running++;
t.true(running <= concurrency);
await delay(randomInt(30, 200));
running--;
}, {concurrency});
});

test('asyncIterator - handles empty iterable', async t => {
t.deepEqual(await pMap(new AsyncTestData([]), mapper), []);
});

test('asyncIterator - async with concurrency: 2 (random time sequence)', async t => {
const input = Array.from({length: 10}).map(() => randomInt(0, 100));
const mapper = value => delay(value, {value});
const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2});
t.deepEqual(result, input);
});

test('asyncIterator - async with concurrency: 2 (problematic time sequence)', async t => {
const input = [100, 200, 10, 36, 13, 45];
const mapper = value => delay(value, {value});
const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2});
t.deepEqual(result, input);
});

test('asyncIterator - async with concurrency: 2 (out of order time sequence)', async t => {
const input = [200, 100, 50];
const mapper = value => delay(value, {value});
const result = await pMap(new AsyncTestData(input), mapper, {concurrency: 2});
t.deepEqual(result, input);
});

test('asyncIterator - enforce number in options.concurrency', async t => {
await t.throwsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 0}), {instanceOf: TypeError});
await t.throwsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 1.5}), {instanceOf: TypeError});
await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 1}));
await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: 10}));
await t.notThrowsAsync(pMap(new AsyncTestData([]), () => {}, {concurrency: Number.POSITIVE_INFINITY}));
});

test('asyncIterator - immediately rejects when stopOnError is true', async t => {
await t.throwsAsync(pMap(new AsyncTestData(errorInput1), mapper, {concurrency: 1}), {message: 'foo'});
await t.throwsAsync(pMap(new AsyncTestData(errorInput2), mapper, {concurrency: 1}), {message: 'bar'});
});

test('asyncIterator - aggregate errors when stopOnError is false', async t => {
await t.notThrowsAsync(pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1, stopOnError: false}));
await t.throwsAsync(pMap(new AsyncTestData(errorInput1), mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /foo(.|\n)*bar/});
await t.throwsAsync(pMap(new AsyncTestData(errorInput2), mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /bar(.|\n)*foo/});
});

test('asyncIterator - pMapSkip', async t => {
t.deepEqual(await pMap(new AsyncTestData([
1,
pMapSkip,
2
]), async value => value), [1, 2]);
});

test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
await t.throwsAsync(
pMap(new AsyncTestData(input), async value => {
if (typeof value === 'function') {
value = await value();
}

mappedValues.push(value);
if (value === 1) {
await delay(100);
throw new Error(`Oops! ${value}`);
}
}),
{message: 'Oops! 1'}
);
await delay(500);
t.deepEqual(mappedValues, [1, 3, 2]);
});

test('catches exception from source iterator - 1st item', async t => {
Expand Down Expand Up @@ -225,3 +349,22 @@ test('catches exception from source iterator - 2nd item after 1st item mapper th
t.is(input.index, 2);
t.deepEqual(mappedValues, [0]);
});

test('asyncIterator - get the correct exception after stop-on-error', async t => {
const input = [1, async () => delay(200, {value: 2}), async () => delay(300, {value: 3})];
const mappedValues = [];

const task = pMap(new AsyncTestData(input), async value => {
if (typeof value === 'function') {
value = await value();
}

mappedValues.push(value);
// Throw for each item - all should fail and we should get only the first
await delay(100);
throw new Error(`Oops! ${value}`);
});
await delay(500);
await t.throwsAsync(task, {message: 'Oops! 1'});
t.deepEqual(mappedValues, [1, 2, 3]);
});