Skip to content

Commit

Permalink
fix: address code review comments on #6355
Browse files Browse the repository at this point in the history
Re: #11601
  • Loading branch information
vkarpov15 committed Apr 2, 2022
1 parent 2f9c2d0 commit 58e99ac
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 31 deletions.
7 changes: 5 additions & 2 deletions lib/error/eachAsyncMultiError.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ const MongooseError = require('./');


/**
* The connection failed to reconnect and will never successfully reconnect to
* MongoDB without manual intervention.
* If `eachAsync()` is called with `continueOnError: true`, there can be
* multiple errors. This error class contains an `errors` property, which
* contains an array of all errors that occurred in `eachAsync()`.
*
* @api private
*/

class EachAsyncMultiError extends MongooseError {
/**
* @param {String} connectionString
Expand Down
27 changes: 20 additions & 7 deletions lib/helpers/cursor/eachAsync.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module.exports = function eachAsync(next, fn, options, callback) {
const parallel = options.parallel || 1;
const batchSize = options.batchSize;
const continueOnError = options.continueOnError;
let aggregatedErrors = null;
const aggregatedErrors = [];
const enqueue = asyncQueue();

return promiseOrCallback(callback, cb => {
Expand Down Expand Up @@ -65,8 +65,7 @@ module.exports = function eachAsync(next, fn, options, callback) {
}
if (err != null) {
if (continueOnError) {
aggregatedErrors = aggregatedErrors || new EachAsyncMultiError([]);
aggregatedErrors.errors.push(err);
aggregatedErrors.push(err);
} else {
error = err;
finalCallback(err);
Expand All @@ -76,7 +75,11 @@ module.exports = function eachAsync(next, fn, options, callback) {
if (doc == null) {
drained = true;
if (handleResultsInProgress <= 0) {
finalCallback(aggregatedErrors);
const finalErr = continueOnError ?
createEachAsyncMultiError(aggregatedErrors) :
error;

finalCallback(finalErr);
} else if (batchSize && documentsBatch.length) {
handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
}
Expand Down Expand Up @@ -110,15 +113,17 @@ module.exports = function eachAsync(next, fn, options, callback) {
}
if (err != null) {
if (continueOnError) {
aggregatedErrors = aggregatedErrors || new EachAsyncMultiError([]);
aggregatedErrors.errors.push(err);
aggregatedErrors.push(err);
} else {
error = err;
return finalCallback(err);
}
}
if (drained && handleResultsInProgress <= 0) {
return finalCallback(aggregatedErrors);
const finalErr = continueOnError ?
createEachAsyncMultiError(aggregatedErrors) :
error;
return finalCallback(finalErr);
}

immediate(() => enqueue(fetch));
Expand Down Expand Up @@ -177,3 +182,11 @@ function asyncQueue() {
}
}
}

function createEachAsyncMultiError(aggregatedErrors) {
if (aggregatedErrors.length === 0) {
return null;
}

return new EachAsyncMultiError(aggregatedErrors);
}
37 changes: 17 additions & 20 deletions test/helpers/cursor.eachAsync.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ describe('eachAsync()', function() {
return eachAsync(next, fn, { batchSize, parallel });
});

it('executes all documents and aggregates errors if continueOnError set (gh-6355)', () => {
it('executes all documents and aggregates errors if continueOnError set (gh-6355)', async() => {
const max = 3;
let numCalled = 0;
function next(cb) {
Expand All @@ -151,18 +151,17 @@ describe('eachAsync()', function() {
return doc.num % 2 === 1 ? Promise.reject(new Error(`Doc number ${doc.num}`)) : null;
}

return eachAsync(next, fn, { continueOnError: true }).
then(() => { throw new Error('Expected error'); },
err => {
assert.ok(err);
assert.equal(err.name, 'EachAsyncMultiError');
assert.equal(err.errors.length, 2);
assert.equal(err.errors[0].message, 'Doc number 1');
assert.equal(err.errors[1].message, 'Doc number 3');
});
const err = await eachAsync(next, fn, { continueOnError: true }).then(() => null, err => err);
assert.ok(err);
assert.equal(err.name, 'EachAsyncMultiError');
assert.ok(err.message.includes('Doc number 1'));
assert.ok(err.message.includes('Doc number 3'));
assert.equal(err.errors.length, 2);
assert.equal(err.errors[0].message, 'Doc number 1');
assert.equal(err.errors[1].message, 'Doc number 3');
});

it('returns aggregated error fetching documents with continueOnError (gh-6355)', () => {
it('returns aggregated error fetching documents with continueOnError (gh-6355)', async() => {
const max = 3;
let numCalled = 0;
function next(cb) {
Expand All @@ -181,14 +180,12 @@ describe('eachAsync()', function() {
return null;
}

return eachAsync(next, fn, { continueOnError: true }).
then(() => { throw new Error('Expected error'); },
err => {
assert.ok(err);
assert.equal(err.name, 'EachAsyncMultiError', err);
assert.equal(err.errors.length, 1);
assert.equal(err.errors[0].message, 'Fetching doc 1');
assert.equal(numCalled, 1);
});
const err = await eachAsync(next, fn, { continueOnError: true }).then(() => null, err => err);
assert.ok(err);
assert.equal(err.name, 'EachAsyncMultiError', err);
assert.ok(err.message.includes('Fetching doc 1'));
assert.equal(err.errors.length, 1);
assert.equal(err.errors[0].message, 'Fetching doc 1');
assert.equal(numCalled, 1);
});
});
4 changes: 2 additions & 2 deletions types/cursor.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ declare module 'mongoose' {
* will wait for the promise to resolve before iterating on to the next one.
* Returns a promise that resolves when done.
*/
eachAsync(fn: (doc: DocType[]) => any, options: EachAsyncOptions, callback: CallbackWithoutResult): void;
eachAsync(fn: (doc: DocType[]) => any, options: EachAsyncOptions & { batchSize: number }, callback: CallbackWithoutResult): void;
eachAsync(fn: (doc: DocType) => any, options: EachAsyncOptions, callback: CallbackWithoutResult): void;
eachAsync(fn: (doc: DocType[]) => any, options: EachAsyncOptions): Promise<void>;
eachAsync(fn: (doc: DocType[]) => any, options: EachAsyncOptions & { batchSize: number }): Promise<void>;
eachAsync(fn: (doc: DocType) => any, options?: EachAsyncOptions): Promise<void>;

/**
Expand Down

0 comments on commit 58e99ac

Please sign in to comment.