Skip to content

Commit

Permalink
Prevent skip removal logic from being run twice
Browse files Browse the repository at this point in the history
  • Loading branch information
huntharo committed Sep 12, 2021
1 parent 8477a7a commit d5a0b26
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 20 deletions.
29 changes: 15 additions & 14 deletions index.js
Expand Up @@ -21,42 +21,43 @@ export default async function pMap(
const errors = [];
const skippedIndexes = [];
let isRejected = false;
let isRejectedOrResolved = false;
let isIterableDone = false;
let resolvingCount = 0;
let currentIndex = 0;
let asyncIterator = false;
let iterator;

if (iterable[Symbol.iterator] === undefined) {
// We've got an async iterable
iterator = iterable[Symbol.asyncIterator]();
asyncIterator = true;
} else {
iterator = iterable[Symbol.iterator]();
}
const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator]();

const reject = reason => {
isRejected = true;
isRejectedOrResolved = true;
reject_(reason);
};

const next = async () => {
if (isRejected) {
if (isRejectedOrResolved) {
return;
}

const nextItem = asyncIterator ? await iterator.next() : iterator.next();
const nextItem = await iterator.next();

const index = currentIndex;
currentIndex++;

// Note: iterator.next() can be called many times in parallel.
// This can cause multiple calls to this next() function to
// receive a `nextItem` with `done === true`.
// The shutdown logic that rejects/resolves must be protected
// so it runs only one time as the `skippedIndex` logic is
// non-idempotent.
if (nextItem.done) {
isIterableDone = true;

if (resolvingCount === 0) {
if (resolvingCount === 0 && !isRejectedOrResolved) {
if (!stopOnError && errors.length > 0) {
reject(new AggregateError(errors));
} else {
isRejectedOrResolved = true;

for (const skippedIndex of skippedIndexes) {
result.splice(skippedIndex, 1);
}
Expand All @@ -75,7 +76,7 @@ export default async function pMap(
try {
const element = await nextItem.value;

if (isRejected) {
if (isRejectedOrResolved) {
return;
}

Expand Down
31 changes: 25 additions & 6 deletions test.js
Expand Up @@ -157,7 +157,7 @@ test('pMapSkip', async t => {
], async value => value), [1, 2]);
});

test('do not run mapping after stop-on-error happened', async t => {
test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
await t.throwsAsync(
Expand Down Expand Up @@ -271,7 +271,7 @@ test('asyncIterator - pMapSkip', async t => {
]), async value => value), [1, 2]);
});

test('asyncIterator - do not run mapping after stop-on-error happened', async t => {
test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
await t.throwsAsync(
Expand All @@ -283,13 +283,13 @@ test('asyncIterator - do not run mapping after stop-on-error happened', async t
mappedValues.push(value);
if (value === 1) {
await delay(100);
throw new Error('Oops!');
throw new Error(`Oops! ${value}`);
}
},
{concurrency: 1})
}),
{message: 'Oops! 1'}
);
await delay(500);
t.deepEqual(mappedValues, [1]);
t.deepEqual(mappedValues, [1, 3, 2]);
});

test('catches exception from source iterator - 1st item', async t => {
Expand Down Expand Up @@ -349,3 +349,22 @@ test('catches exception from source iterator - 2nd item after 1st item mapper th
t.is(input.index, 2);
t.deepEqual(mappedValues, [0]);
});

test('asyncIterator - get the correct exception after stop-on-error', async t => {
const input = [1, async () => delay(200, {value: 2}), async () => delay(300, {value: 3})];
const mappedValues = [];

const task = pMap(new AsyncTestData(input), async value => {
if (typeof value === 'function') {
value = await value();
}

mappedValues.push(value);
// Throw for each item - all should fail and we should get only the first
await delay(100);
throw new Error(`Oops! ${value}`);
});
await delay(500);
await t.throwsAsync(task, {message: 'Oops! 1'});
t.deepEqual(mappedValues, [1, 2, 3]);
});

0 comments on commit d5a0b26

Please sign in to comment.