Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cursor): make eachAsync() avoid modifying batch when mixing parallel and batchSize #12716

Merged
merged 1 commit into from Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
144 changes: 79 additions & 65 deletions lib/helpers/cursor/eachAsync.js
Expand Up @@ -33,7 +33,7 @@ module.exports = function eachAsync(next, fn, options, callback) {
const aggregatedErrors = [];
const enqueue = asyncQueue();

let drained = false;
let aborted = false;

return promiseOrCallback(callback, cb => {
if (signal != null) {
Expand All @@ -42,7 +42,7 @@ module.exports = function eachAsync(next, fn, options, callback) {
}

signal.addEventListener('abort', () => {
drained = true;
aborted = true;
return cb(null);
}, { once: true });
}
Expand All @@ -63,90 +63,104 @@ module.exports = function eachAsync(next, fn, options, callback) {
function iterate(finalCallback) {
let handleResultsInProgress = 0;
let currentDocumentIndex = 0;
let documentsBatch = [];

let error = null;
for (let i = 0; i < parallel; ++i) {
enqueue(fetch);
enqueue(createFetch());
}

function fetch(done) {
if (drained || error) {
return done();
}
function createFetch() {
let documentsBatch = [];
let drained = false;

return fetch;

next(function(err, doc) {
if (drained || error != null) {
function fetch(done) {
if (drained || aborted) {
return done();
} else if (error) {
return done();
}
if (err != null) {
if (continueOnError) {
aggregatedErrors.push(err);
} else {
error = err;
finalCallback(err);

next(function(err, doc) {
if (error != null) {
return done();
}
}
if (doc == null) {
drained = true;
if (handleResultsInProgress <= 0) {
const finalErr = continueOnError ?
createEachAsyncMultiError(aggregatedErrors) :
error;

finalCallback(finalErr);
} else if (batchSize && documentsBatch.length) {
handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
if (err != null) {
if (err.name === 'MongoCursorExhaustedError') {
// We may end up calling `next()` multiple times on an exhausted
// cursor, which leads to an error. In case cursor is exhausted,
// just treat it as if the cursor returned no document, which is
// how a cursor indicates it is exhausted.
doc = null;
} else if (continueOnError) {
aggregatedErrors.push(err);
} else {
error = err;
finalCallback(err);
return done();
}
}
if (doc == null) {
drained = true;
if (handleResultsInProgress <= 0) {
const finalErr = continueOnError ?
createEachAsyncMultiError(aggregatedErrors) :
error;

finalCallback(finalErr);
} else if (batchSize && documentsBatch.length) {
handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
}
return done();
}
return done();
}

++handleResultsInProgress;
++handleResultsInProgress;

// Kick off the subsequent `next()` before handling the result, but
// make sure we know that we still have a result to handle re: #8422
immediate(() => done());
// Kick off the subsequent `next()` before handling the result, but
// make sure we know that we still have a result to handle re: #8422
immediate(() => done());

if (batchSize) {
documentsBatch.push(doc);
}
if (batchSize) {
documentsBatch.push(doc);
}

// If the current documents size is less than the provided patch size don't process the documents yet
if (batchSize && documentsBatch.length !== batchSize) {
immediate(() => enqueue(fetch));
return;
}
// If the current documents size is less than the provided batch size don't process the documents yet
if (batchSize && documentsBatch.length !== batchSize) {
immediate(() => enqueue(fetch));
return;
}

const docsToProcess = batchSize ? documentsBatch : doc;
const docsToProcess = batchSize ? documentsBatch : doc;

function handleNextResultCallBack(err) {
if (batchSize) {
handleResultsInProgress -= documentsBatch.length;
documentsBatch = [];
} else {
--handleResultsInProgress;
}
if (err != null) {
if (continueOnError) {
aggregatedErrors.push(err);
function handleNextResultCallBack(err) {
if (batchSize) {
handleResultsInProgress -= documentsBatch.length;
documentsBatch = [];
} else {
error = err;
return finalCallback(err);
--handleResultsInProgress;
}
if (err != null) {
if (continueOnError) {
aggregatedErrors.push(err);
} else {
error = err;
return finalCallback(err);
}
}
if ((drained || aborted) && handleResultsInProgress <= 0) {
const finalErr = continueOnError ?
createEachAsyncMultiError(aggregatedErrors) :
error;
return finalCallback(finalErr);
}
}
if (drained && handleResultsInProgress <= 0) {
const finalErr = continueOnError ?
createEachAsyncMultiError(aggregatedErrors) :
error;
return finalCallback(finalErr);
}

immediate(() => enqueue(fetch));
}
immediate(() => enqueue(fetch));
}

handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack);
});
handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack);
});
}
}
}

Expand Down
24 changes: 24 additions & 0 deletions test/helpers/cursor.eachAsync.test.js
Expand Up @@ -189,6 +189,30 @@ describe('eachAsync()', function() {
assert.equal(numCalled, 1);
});

it('avoids mutating document batch with parallel (gh-12652)', async() => {
const max = 100;
let numCalled = 0;
function next(cb) {
setImmediate(() => {
if (++numCalled > max) {
return cb(null, null);
}
cb(null, { num: numCalled });
});
}

let numDocsProcessed = 0;
async function fn(batch) {
numDocsProcessed += batch.length;
const length = batch.length;
await new Promise(resolve => setTimeout(resolve, 50));
assert.equal(batch.length, length);
}

await eachAsync(next, fn, { parallel: 7, batchSize: 10 });
assert.equal(numDocsProcessed, max);
});

it('using AbortSignal (gh-12173)', async function() {
if (typeof AbortController === 'undefined') {
return this.skip();
Expand Down