Skip to content

Commit

Permalink
refactor(cursor): remove dependency on async.times()
Browse files Browse the repository at this point in the history
Re: #8073
Re: #5502
  • Loading branch information
Valeri Karpov authored and Valeri Karpov committed Sep 27, 2019
1 parent c5b2355 commit e60db1b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
28 changes: 22 additions & 6 deletions lib/helpers/cursor/eachAsync.js
Expand Up @@ -38,18 +38,25 @@ module.exports = function eachAsync(next, fn, options, callback) {
const iterate = function(callback) {
let drained = false;
const nextQueue = async.queue(function(task, cb) {
if (drained) return cb();
if (drained) {
return cb();
}
next(function(err, doc) {
if (err) return cb(err);
if (!doc) drained = true;
cb(null, doc);
});
}, 1);

const getAndRun = function(cb) {
nextQueue.push({}, function(err, doc) {
if (err) return cb(err);
if (!doc) return cb();
if (drained) {
return;
}
if (doc == null) {
drained = true;
return callback(null);
}
handleNextResult(doc, function(err) {
if (err) return cb(err);
// Make sure to clear the stack re: gh-4697
Expand All @@ -60,9 +67,18 @@ module.exports = function eachAsync(next, fn, options, callback) {
});
};

async.times(parallel, function(n, cb) {
getAndRun(cb);
}, callback);
let error = null;
for (let i = 0; i < parallel; ++i) {
getAndRun(err => {
if (error != null) {
return;
}
if (err != null) {
error = err;
return callback(err);
}
});
}
};

return utils.promiseOrCallback(callback, cb => {
Expand Down
4 changes: 2 additions & 2 deletions test/aggregate.test.js
Expand Up @@ -1288,10 +1288,10 @@ describe('aggregate: ', function() {
return MyModel.aggregate([{ $sort: { name: 1 } }]).
cursor().
exec().
eachAsync(checkDoc, { parallel: 2}).then(function() {
eachAsync(checkDoc, { parallel: 2 }).then(function() {
assert.ok(Date.now() - startedAt[1] >= 100);
assert.equal(startedAt.length, 2);
assert.ok(startedAt[1] - startedAt[0] < 50);
assert.ok(startedAt[1] - startedAt[0] < 50, `${startedAt[1] - startedAt[0]}`);
assert.deepEqual(names.sort(), expectedNames);
done();
});
Expand Down

0 comments on commit e60db1b

Please sign in to comment.