diff --git a/lib/helpers/cursor/eachAsync.js b/lib/helpers/cursor/eachAsync.js index 5e6207c7687..a343c01b0aa 100644 --- a/lib/helpers/cursor/eachAsync.js +++ b/lib/helpers/cursor/eachAsync.js @@ -4,7 +4,6 @@ * Module dependencies. */ -const async = require('async'); const utils = require('../../utils'); /** @@ -37,18 +36,9 @@ module.exports = function eachAsync(next, fn, options, callback) { const iterate = function(callback) { let drained = false; - const nextQueue = async.queue(function(task, cb) { - if (drained) { - return cb(); - } - next(function(err, doc) { - if (err) return cb(err); - cb(null, doc); - }); - }, 1); const getAndRun = function(cb) { - nextQueue.push({}, function(err, doc) { + _next(function(err, doc) { if (err) return cb(err); if (drained) { return; @@ -81,7 +71,37 @@ module.exports = function eachAsync(next, fn, options, callback) { } }; + const _nextQueue = []; return utils.promiseOrCallback(callback, cb => { iterate(cb); }); + + // `next()` can only execute one at a time, so make sure we always execute + // `next()` in series, while still allowing multiple `fn()` instances to run + // in parallel. + function _next(cb) { + if (_nextQueue.length === 0) { + return next(_step(cb)); + } + _nextQueue.push(cb); + } + + function _step(cb) { + return function(err, doc) { + if (err != null) { + return cb(err); + } + cb(null, doc); + + if (doc == null) { + return; + } + + setTimeout(() => { + if (_nextQueue.length > 0) { + next(_step(_nextQueue.unshift())); + } + }, 0); + }; + } };