Skip to content

Commit

Permalink
fix(cursor): fix issue with long-running eachAsync() cursor
Browse files Browse the repository at this point in the history
Fix #8235
  • Loading branch information
vkarpov15 committed Oct 19, 2019
1 parent ba18b9d commit a47ac98
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 51 deletions.
104 changes: 54 additions & 50 deletions lib/helpers/cursor/eachAsync.js
Expand Up @@ -22,6 +22,7 @@ const utils = require('../../utils');

module.exports = function eachAsync(next, fn, options, callback) {
const parallel = options.parallel || 1;
const enqueue = asyncQueue();

const handleNextResult = function(doc, callback) {
const promise = fn(doc);
Expand All @@ -37,71 +38,74 @@ module.exports = function eachAsync(next, fn, options, callback) {
const iterate = function(callback) {
let drained = false;

const getAndRun = function(cb) {
_next(function(err, doc) {
if (err) return cb(err);
if (drained) {
return;
}
if (doc == null) {
drained = true;
return callback(null);
}
handleNextResult(doc, function(err) {
if (err) return cb(err);
// Make sure to clear the stack re: gh-4697
setTimeout(function() {
getAndRun(cb);
}, 0);
});
});
};

let error = null;
for (let i = 0; i < parallel; ++i) {
getAndRun(err => {
if (error != null) {
return;
enqueue(fetch);
}

function fetch(done) {
if (drained || error) {
return done();
}

next(function(err, doc) {
if (drained || error) {
return done();
}
if (err != null) {
error = err;
return callback(err);
callback(err);
return done();
}
if (doc == null) {
drained = true;
callback(null);
return done();
}

done();

handleNextResult(doc, function(err) {
if (err != null) {
error = err;
return callback(err);
}

setTimeout(() => enqueue(fetch), 0);
});
});
}
};

const _nextQueue = [];
return utils.promiseOrCallback(callback, cb => {
iterate(cb);
});
};

// `next()` can only execute one at a time, so make sure we always execute
// `next()` in series, while still allowing multiple `fn()` instances to run
// in parallel.
function _next(cb) {
if (_nextQueue.length === 0) {
return next(_step(cb));
}
_nextQueue.push(cb);
}
// `next()` can only execute one at a time, so make sure we always execute
// `next()` in series, while still allowing multiple `fn()` instances to run
// in parallel.
function asyncQueue() {
const _queue = [];
let inProgress = null;
let id = 0;

function _step(cb) {
return function(err, doc) {
if (err != null) {
return cb(err);
}
cb(null, doc);
return function enqueue(fn) {
if (_queue.length === 0 && inProgress == null) {
inProgress = id++;
return fn(_step);
}
_queue.push(fn);
};

if (doc == null) {
return;
function _step() {
setTimeout(() => {
inProgress = null;
if (_queue.length > 0) {
inProgress = id++;
const fn = _queue.shift();
fn(_step);
}

setTimeout(() => {
if (_nextQueue.length > 0) {
next(_step(_nextQueue.unshift()));
}
}, 0);
};
}, 0);
}
};
}
2 changes: 1 addition & 1 deletion test/helpers/cursor.eachAsync.test.js
Expand Up @@ -6,7 +6,7 @@ const eachAsync = require('../../lib/helpers/cursor/eachAsync');
describe('eachAsync()', function() {
it('exhausts large cursor without parallel calls (gh-8235)', function() {
this.timeout(10000);

let numInProgress = 0;
let num = 0;
const max = 1000;
Expand Down

0 comments on commit a47ac98

Please sign in to comment.