Skip to content

Commit

Permalink
feat(cursor): WIP continueOnError support for eachAsync() re: #6355
Browse files Browse the repository at this point in the history
  • Loading branch information
vkarpov15 committed Mar 28, 2022
1 parent 1dbe6f3 commit 6f40b94
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 5 deletions.
2 changes: 2 additions & 0 deletions lib/cursor/QueryCursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ QueryCursor.prototype.next = function(callback) {
* @param {Function} fn
* @param {Object} [options]
* @param {Number} [options.parallel] the number of promises to execute in parallel. Defaults to 1.
* @param {Number} [options.batchSize] if set, will call `fn()` with arrays of documents with length at most `batchSize`
* @param {Boolean} [options.continueOnError=false] if true, `eachAsync()` iterates through all docs even if `fn` throws an error. If false, `eachAsync()` throws an error immediately if the given function `fn()` throws an error.
* @param {Function} [callback] executed when all docs have been processed
* @return {Promise}
* @api public
Expand Down
38 changes: 38 additions & 0 deletions lib/error/eachAsyncMultiError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*!
* Module dependencies.
*/

'use strict';

const MongooseError = require('./');


/**
* The connection failed to reconnect and will never successfully reconnect to
* MongoDB without manual intervention.
* @api private
*/
class EachAsyncMultiError extends MongooseError {
/**
* @param {String} connectionString
*/
constructor(errors) {
let preview = errors.map(e => e.message).join(', ');
if (preview.length > 50) {
preview = preview.slice(0, 50) + '...';
}
super(`eachAsync() finished with ${errors.length} errors: ${preview}`);

this.errors = errors;
}
}

Object.defineProperty(EachAsyncMultiError.prototype, 'name', {
value: 'EachAsyncMultiError'
});

/*!
* exports
*/

module.exports = EachAsyncMultiError;
5 changes: 4 additions & 1 deletion lib/helpers/cursor/eachAsync.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Module dependencies.
*/

const EachAsyncMultiError = require('../../error/eachAsyncMultiError');
const immediate = require('../immediate');
const promiseOrCallback = require('../promiseOrCallback');

Expand All @@ -24,10 +25,11 @@ const promiseOrCallback = require('../promiseOrCallback');
module.exports = function eachAsync(next, fn, options, callback) {
const parallel = options.parallel || 1;
const batchSize = options.batchSize;
const continueOnError = options.continueOnError;
const errors = [];
const enqueue = asyncQueue();

return promiseOrCallback(callback, cb => {

if (batchSize != null) {
if (typeof batchSize !== 'number') {
throw new TypeError('batchSize must be a number');
Expand Down Expand Up @@ -62,6 +64,7 @@ module.exports = function eachAsync(next, fn, options, callback) {
return done();
}
if (err != null) {
errors.push(err);
error = err;
finalCallback(err);
return done();
Expand Down
14 changes: 10 additions & 4 deletions types/cursor.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ import stream = require('stream');
declare module 'mongoose' {
type CursorFlag = 'tailable' | 'oplogReplay' | 'noCursorTimeout' | 'awaitData' | 'partial';

interface EachAsyncOptions {
parallel?: number;
batchSize?: number;
continueOnError?: boolean;
}

class Cursor<DocType = any, Options = never> extends stream.Readable {
[Symbol.asyncIterator](): AsyncIterableIterator<DocType>;

Expand All @@ -25,10 +31,10 @@ declare module 'mongoose' {
* will wait for the promise to resolve before iterating on to the next one.
* Returns a promise that resolves when done.
*/
eachAsync(fn: (doc: DocType[]) => any, options: { parallel?: number, batchSize: number }, callback: CallbackWithoutResult): void;
eachAsync(fn: (doc: DocType) => any, options: { parallel?: number }, callback: CallbackWithoutResult): void;
eachAsync(fn: (doc: DocType[]) => any, options: { parallel?: number, batchSize: number }): Promise<void>;
eachAsync(fn: (doc: DocType) => any, options?: { parallel?: number }): Promise<void>;
eachAsync(fn: (doc: DocType[]) => any, options: EachAsyncOptions, callback: CallbackWithoutResult): void;
eachAsync(fn: (doc: DocType) => any, options: EachAsyncOptions, callback: CallbackWithoutResult): void;
eachAsync(fn: (doc: DocType[]) => any, options: EachAsyncOptions): Promise<void>;
eachAsync(fn: (doc: DocType) => any, options?: EachAsyncOptions): Promise<void>;

/**
* Registers a transform function which subsequently maps documents retrieved
Expand Down

0 comments on commit 6f40b94

Please sign in to comment.