Skip to content

Commit

Permalink
Merge pull request #12716 from Automattic/vkarpov15/gh-12652
Browse files Browse the repository at this point in the history
fix(cursor): make `eachAsync()` avoid modifying batch when mixing `parallel` and `batchSize`
  • Loading branch information
vkarpov15 committed Nov 23, 2022
2 parents e50de5c + fde5e0f commit 966eebe
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 65 deletions.
144 changes: 79 additions & 65 deletions lib/helpers/cursor/eachAsync.js
Expand Up @@ -33,7 +33,7 @@ module.exports = function eachAsync(next, fn, options, callback) {
const aggregatedErrors = [];
const enqueue = asyncQueue();

let drained = false;
let aborted = false;

return promiseOrCallback(callback, cb => {
if (signal != null) {
Expand All @@ -42,7 +42,7 @@ module.exports = function eachAsync(next, fn, options, callback) {
}

signal.addEventListener('abort', () => {
drained = true;
aborted = true;
return cb(null);
}, { once: true });
}
Expand All @@ -63,90 +63,104 @@ module.exports = function eachAsync(next, fn, options, callback) {
function iterate(finalCallback) {
let handleResultsInProgress = 0;
let currentDocumentIndex = 0;
let documentsBatch = [];

let error = null;
for (let i = 0; i < parallel; ++i) {
enqueue(fetch);
enqueue(createFetch());
}

function fetch(done) {
if (drained || error) {
return done();
}
function createFetch() {
let documentsBatch = [];
let drained = false;

return fetch;

next(function(err, doc) {
if (drained || error != null) {
function fetch(done) {
if (drained || aborted) {
return done();
} else if (error) {
return done();
}
if (err != null) {
if (continueOnError) {
aggregatedErrors.push(err);
} else {
error = err;
finalCallback(err);

next(function(err, doc) {
if (error != null) {
return done();
}
}
if (doc == null) {
drained = true;
if (handleResultsInProgress <= 0) {
const finalErr = continueOnError ?
createEachAsyncMultiError(aggregatedErrors) :
error;

finalCallback(finalErr);
} else if (batchSize && documentsBatch.length) {
handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
if (err != null) {
if (err.name === 'MongoCursorExhaustedError') {
// We may end up calling `next()` multiple times on an exhausted
// cursor, which leads to an error. In case cursor is exhausted,
// just treat it as if the cursor returned no document, which is
// how a cursor indicates it is exhausted.
doc = null;
} else if (continueOnError) {
aggregatedErrors.push(err);
} else {
error = err;
finalCallback(err);
return done();
}
}
if (doc == null) {
drained = true;
if (handleResultsInProgress <= 0) {
const finalErr = continueOnError ?
createEachAsyncMultiError(aggregatedErrors) :
error;

finalCallback(finalErr);
} else if (batchSize && documentsBatch.length) {
handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
}
return done();
}
return done();
}

++handleResultsInProgress;
++handleResultsInProgress;

// Kick off the subsequent `next()` before handling the result, but
// make sure we know that we still have a result to handle re: #8422
immediate(() => done());
// Kick off the subsequent `next()` before handling the result, but
// make sure we know that we still have a result to handle re: #8422
immediate(() => done());

if (batchSize) {
documentsBatch.push(doc);
}
if (batchSize) {
documentsBatch.push(doc);
}

// If the current documents size is less than the provided patch size don't process the documents yet
if (batchSize && documentsBatch.length !== batchSize) {
immediate(() => enqueue(fetch));
return;
}
// If the current documents size is less than the provided batch size don't process the documents yet
if (batchSize && documentsBatch.length !== batchSize) {
immediate(() => enqueue(fetch));
return;
}

const docsToProcess = batchSize ? documentsBatch : doc;
const docsToProcess = batchSize ? documentsBatch : doc;

function handleNextResultCallBack(err) {
if (batchSize) {
handleResultsInProgress -= documentsBatch.length;
documentsBatch = [];
} else {
--handleResultsInProgress;
}
if (err != null) {
if (continueOnError) {
aggregatedErrors.push(err);
function handleNextResultCallBack(err) {
if (batchSize) {
handleResultsInProgress -= documentsBatch.length;
documentsBatch = [];
} else {
error = err;
return finalCallback(err);
--handleResultsInProgress;
}
if (err != null) {
if (continueOnError) {
aggregatedErrors.push(err);
} else {
error = err;
return finalCallback(err);
}
}
if ((drained || aborted) && handleResultsInProgress <= 0) {
const finalErr = continueOnError ?
createEachAsyncMultiError(aggregatedErrors) :
error;
return finalCallback(finalErr);
}
}
if (drained && handleResultsInProgress <= 0) {
const finalErr = continueOnError ?
createEachAsyncMultiError(aggregatedErrors) :
error;
return finalCallback(finalErr);
}

immediate(() => enqueue(fetch));
}
immediate(() => enqueue(fetch));
}

handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack);
});
handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack);
});
}
}
}

Expand Down
24 changes: 24 additions & 0 deletions test/helpers/cursor.eachAsync.test.js
Expand Up @@ -189,6 +189,30 @@ describe('eachAsync()', function() {
assert.equal(numCalled, 1);
});

it('avoids mutating document batch with parallel (gh-12652)', async() => {
const max = 100;
let numCalled = 0;
function next(cb) {
setImmediate(() => {
if (++numCalled > max) {
return cb(null, null);
}
cb(null, { num: numCalled });
});
}

let numDocsProcessed = 0;
async function fn(batch) {
numDocsProcessed += batch.length;
const length = batch.length;
await new Promise(resolve => setTimeout(resolve, 50));
assert.equal(batch.length, length);
}

await eachAsync(next, fn, { parallel: 7, batchSize: 10 });
assert.equal(numDocsProcessed, max);
});

it('using AbortSignal (gh-12173)', async function() {
if (typeof AbortController === 'undefined') {
return this.skip();
Expand Down

0 comments on commit 966eebe

Please sign in to comment.