Skip to content

Commit

Permalink
feat(cursor): support continueOnError option for QueryCursor and Ag…
Browse files Browse the repository at this point in the history
…gregationCursor `eachAsync()`

Fix #6355
  • Loading branch information
vkarpov15 committed Mar 30, 2022
1 parent 6f40b94 commit 2f9c2d0
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 13 deletions.
42 changes: 29 additions & 13 deletions lib/helpers/cursor/eachAsync.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module.exports = function eachAsync(next, fn, options, callback) {
const parallel = options.parallel || 1;
const batchSize = options.batchSize;
const continueOnError = options.continueOnError;
const errors = [];
let aggregatedErrors = null;
const enqueue = asyncQueue();

return promiseOrCallback(callback, cb => {
Expand Down Expand Up @@ -64,15 +64,19 @@ module.exports = function eachAsync(next, fn, options, callback) {
return done();
}
if (err != null) {
errors.push(err);
error = err;
finalCallback(err);
return done();
if (continueOnError) {
aggregatedErrors = aggregatedErrors || new EachAsyncMultiError([]);
aggregatedErrors.errors.push(err);
} else {
error = err;
finalCallback(err);
return done();
}
}
if (doc == null) {
drained = true;
if (handleResultsInProgress <= 0) {
finalCallback(null);
finalCallback(aggregatedErrors);
} else if (batchSize && documentsBatch.length) {
handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
}
Expand Down Expand Up @@ -105,11 +109,16 @@ module.exports = function eachAsync(next, fn, options, callback) {
--handleResultsInProgress;
}
if (err != null) {
error = err;
return finalCallback(err);
if (continueOnError) {
aggregatedErrors = aggregatedErrors || new EachAsyncMultiError([]);
aggregatedErrors.errors.push(err);
} else {
error = err;
return finalCallback(err);
}
}
if (drained && handleResultsInProgress <= 0) {
return finalCallback(null);
return finalCallback(aggregatedErrors);
}

immediate(() => enqueue(fetch));
Expand All @@ -121,11 +130,18 @@ module.exports = function eachAsync(next, fn, options, callback) {
}

function handleNextResult(doc, i, callback) {
const promise = fn(doc, i);
if (promise && typeof promise.then === 'function') {
promise.then(
let maybePromise;
try {
maybePromise = fn(doc, i);
} catch (err) {
return callback(err);
}
if (maybePromise && typeof maybePromise.then === 'function') {
maybePromise.then(
function() { callback(null); },
function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
function(error) {
callback(error || new Error('`eachAsync()` promise rejected without error'));
});
} else {
callback(null);
}
Expand Down
57 changes: 57 additions & 0 deletions test/helpers/cursor.eachAsync.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,61 @@ describe('eachAsync()', function() {

return eachAsync(next, fn, { batchSize, parallel });
});

it('executes all documents and aggregates errors if continueOnError set (gh-6355)', () => {
const max = 3;
let numCalled = 0;
function next(cb) {
setImmediate(() => {
if (++numCalled > max) {
return cb(null, null);
}
cb(null, { num: numCalled });
});
}

function fn(doc) {
return doc.num % 2 === 1 ? Promise.reject(new Error(`Doc number ${doc.num}`)) : null;
}

return eachAsync(next, fn, { continueOnError: true }).
then(() => { throw new Error('Expected error'); },
err => {
assert.ok(err);
assert.equal(err.name, 'EachAsyncMultiError');
assert.equal(err.errors.length, 2);
assert.equal(err.errors[0].message, 'Doc number 1');
assert.equal(err.errors[1].message, 'Doc number 3');
});
});

it('returns aggregated error fetching documents with continueOnError (gh-6355)', () => {
const max = 3;
let numCalled = 0;
function next(cb) {
setImmediate(() => {
if (++numCalled > max) {
return cb(null, null);
}
if (numCalled % 2 === 1) {
return cb(new Error(`Fetching doc ${numCalled}`));
}
cb(null, { num: numCalled });
});
}

function fn() {
return null;
}

return eachAsync(next, fn, { continueOnError: true }).
then(() => { throw new Error('Expected error'); },
err => {
assert.ok(err);
assert.equal(err.name, 'EachAsyncMultiError', err);
assert.equal(err.errors.length, 1);
assert.equal(err.errors[0].message, 'Fetching doc 1');
assert.equal(numCalled, 1);
});
});
});

0 comments on commit 2f9c2d0

Please sign in to comment.