Skip to content

Commit

Permalink
Prevent skip removal logic from being run twice
Browse files Browse the repository at this point in the history
  • Loading branch information
huntharo committed Sep 12, 2021
1 parent 8b81f26 commit ef6558f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
30 changes: 14 additions & 16 deletions index.js
Expand Up @@ -20,35 +20,33 @@ export default async function pMap(
const result = [];
const errors = [];
const skippedIndexes = [];
let isRejected = false;
let isRejectedOrResolved = false;
let isIterableDone = false;
let resolvingCount = 0;
let currentIndex = 0;
let asyncIterator = false;
let iterator;

if (iterable[Symbol.iterator] === undefined) {
// We've got an async iterable
iterator = iterable[Symbol.asyncIterator]();
asyncIterator = true;
} else {
iterator = iterable[Symbol.iterator]();
}
const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator]();

const next = async () => {
if (isRejected) {
if (isRejectedOrResolved) {
return;
}

const nextItem = asyncIterator ? await iterator.next() : iterator.next();
const nextItem = await iterator.next();

const index = currentIndex;
currentIndex++;

// Note: iterator.next() can be called many times in parallel.
// This can cause multiple calls to this next() function to
// receive a `nextItem` with `done === true`.
// The shutdown logic that rejects/resolves must be protected
// so it runs only one time as the `skippedIndex` logic is
// non-idempotent.
if (nextItem.done) {
isIterableDone = true;

if (resolvingCount === 0) {
if (resolvingCount === 0 && !isRejectedOrResolved) {
isRejectedOrResolved = true;
if (!stopOnError && errors.length > 0) {
reject(new AggregateError(errors));
} else {
Expand All @@ -70,7 +68,7 @@ export default async function pMap(
try {
const element = await nextItem.value;

if (isRejected) {
if (isRejectedOrResolved) {
return;
}

Expand All @@ -86,7 +84,7 @@ export default async function pMap(
await next();
} catch (error) {
if (stopOnError) {
isRejected = true;
isRejectedOrResolved = true;
reject(error);
} else {
errors.push(error);
Expand Down
28 changes: 24 additions & 4 deletions test.js
Expand Up @@ -124,7 +124,7 @@ test('pMapSkip', async t => {
], async value => value), [1, 2]);
});

test('do not run mapping after stop-on-error happened', async t => {
test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
await t.throwsAsync(
Expand Down Expand Up @@ -238,7 +238,7 @@ test('asyncIterator - pMapSkip', async t => {
]), async value => value), [1, 2]);
});

test('asyncIterator - do not run mapping after stop-on-error happened', async t => {
test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
await t.throwsAsync(
Expand All @@ -250,10 +250,30 @@ test('asyncIterator - do not run mapping after stop-on-error happened', async t
mappedValues.push(value);
if (value === 1) {
await delay(100);
throw new Error('Oops!');
throw new Error(`Oops! ${value}`);
}
})
}),
{message: 'Oops! 1'}
);
await delay(500);
t.deepEqual(mappedValues, [1, 3, 2]);
});

test('asyncIterator - get the correct exception after stop-on-error', async t => {
const input = [1, async () => delay(200, {value: 2}), async () => delay(300, {value: 3})];
const mappedValues = [];

const task = pMap(new AsyncTestData(input), async value => {
if (typeof value === 'function') {
value = await value();
}

mappedValues.push(value);
// Throw for each item - all should fail and we should get only the first
await delay(100);
throw new Error(`Oops! ${value}`);
});
await delay(500);
await t.throwsAsync(task, {message: 'Oops! 1'});
t.deepEqual(mappedValues, [1, 2, 3]);
});

0 comments on commit ef6558f

Please sign in to comment.