Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AsyncIterable as input #46

Merged
merged 6 commits into from Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 15 additions & 14 deletions index.js
Expand Up @@ -21,42 +21,43 @@ export default async function pMap(
const errors = [];
const skippedIndexes = [];
let isRejected = false;
let isRejectedOrResolved = false;
let isIterableDone = false;
let resolvingCount = 0;
let currentIndex = 0;
let asyncIterator = false;
let iterator;

if (iterable[Symbol.iterator] === undefined) {
// We've got an async iterable
iterator = iterable[Symbol.asyncIterator]();
asyncIterator = true;
} else {
iterator = iterable[Symbol.iterator]();
}
const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator]();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should throw a TypeError with a human-friendly error if the input is neither an iterable or async iterable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think?

Inner function within the function so iterator can remain const? File scope function?

Nested ? operator?

if/else if/else with throw and iterator being non-const?

Also... when you say "TypeError with a human-friendly error"... did you mean a human friendly error-message? Or some specific field you want set on the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done... i added a check for the iterator type at the top, similar to mapper / concurrency checks, which means that the existing code cannot have an error as it would have already thrown out


const reject = reason => {
isRejected = true;
isRejectedOrResolved = true;
huntharo marked this conversation as resolved.
Show resolved Hide resolved
reject_(reason);
};

const next = async () => {
if (isRejected) {
if (isRejectedOrResolved) {
return;
}

const nextItem = asyncIterator ? await iterator.next() : iterator.next();
const nextItem = await iterator.next();

const index = currentIndex;
currentIndex++;

// Note: iterator.next() can be called many times in parallel.
// This can cause multiple calls to this next() function to
// receive a `nextItem` with `done === true`.
// The shutdown logic that rejects/resolves must be protected
// so it runs only one time as the `skippedIndex` logic is
// non-idempotent.
if (nextItem.done) {
isIterableDone = true;

if (resolvingCount === 0) {
if (resolvingCount === 0 && !isRejectedOrResolved) {
if (!stopOnError && errors.length > 0) {
reject(new AggregateError(errors));
} else {
isRejectedOrResolved = true;

for (const skippedIndex of skippedIndexes) {
result.splice(skippedIndex, 1);
}
Expand All @@ -75,7 +76,7 @@ export default async function pMap(
try {
const element = await nextItem.value;

if (isRejected) {
if (isRejectedOrResolved) {
return;
}

Expand Down
31 changes: 25 additions & 6 deletions test.js
Expand Up @@ -157,7 +157,7 @@ test('pMapSkip', async t => {
], async value => value), [1, 2]);
});

test('do not run mapping after stop-on-error happened', async t => {
test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
await t.throwsAsync(
Expand Down Expand Up @@ -271,7 +271,7 @@ test('asyncIterator - pMapSkip', async t => {
]), async value => value), [1, 2]);
});

test('asyncIterator - do not run mapping after stop-on-error happened', async t => {
test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
await t.throwsAsync(
Expand All @@ -283,13 +283,13 @@ test('asyncIterator - do not run mapping after stop-on-error happened', async t
mappedValues.push(value);
if (value === 1) {
await delay(100);
throw new Error('Oops!');
throw new Error(`Oops! ${value}`);
}
},
{concurrency: 1})
}),
{message: 'Oops! 1'}
);
await delay(500);
t.deepEqual(mappedValues, [1]);
t.deepEqual(mappedValues, [1, 3, 2]);
});

test('catches exception from source iterator - 1st item', async t => {
Expand Down Expand Up @@ -349,3 +349,22 @@ test('catches exception from source iterator - 2nd item after 1st item mapper th
t.is(input.index, 2);
t.deepEqual(mappedValues, [0]);
});

test('asyncIterator - get the correct exception after stop-on-error', async t => {
const input = [1, async () => delay(200, {value: 2}), async () => delay(300, {value: 3})];
const mappedValues = [];

const task = pMap(new AsyncTestData(input), async value => {
if (typeof value === 'function') {
value = await value();
}

mappedValues.push(value);
// Throw for each item - all should fail and we should get only the first
await delay(100);
throw new Error(`Oops! ${value}`);
});
await delay(500);
await t.throwsAsync(task, {message: 'Oops! 1'});
t.deepEqual(mappedValues, [1, 2, 3]);
});