From fde5e0f4b973713e52253eebaa273822f8d13ba0 Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Tue, 22 Nov 2022 15:34:10 -0500 Subject: [PATCH] fix(cursor): make `eachAsync()` avoid modifying batch when mixing `parallel` and `batchSize` Fix #12652 --- lib/helpers/cursor/eachAsync.js | 144 ++++++++++++++------------ test/helpers/cursor.eachAsync.test.js | 24 +++++ 2 files changed, 103 insertions(+), 65 deletions(-) diff --git a/lib/helpers/cursor/eachAsync.js b/lib/helpers/cursor/eachAsync.js index 735fd2d8156..e3f6f8a24f4 100644 --- a/lib/helpers/cursor/eachAsync.js +++ b/lib/helpers/cursor/eachAsync.js @@ -33,7 +33,7 @@ module.exports = function eachAsync(next, fn, options, callback) { const aggregatedErrors = []; const enqueue = asyncQueue(); - let drained = false; + let aborted = false; return promiseOrCallback(callback, cb => { if (signal != null) { @@ -42,7 +42,7 @@ module.exports = function eachAsync(next, fn, options, callback) { } signal.addEventListener('abort', () => { - drained = true; + aborted = true; return cb(null); }, { once: true }); } @@ -63,90 +63,104 @@ module.exports = function eachAsync(next, fn, options, callback) { function iterate(finalCallback) { let handleResultsInProgress = 0; let currentDocumentIndex = 0; - let documentsBatch = []; let error = null; for (let i = 0; i < parallel; ++i) { - enqueue(fetch); + enqueue(createFetch()); } - function fetch(done) { - if (drained || error) { - return done(); - } + function createFetch() { + let documentsBatch = []; + let drained = false; + + return fetch; - next(function(err, doc) { - if (drained || error != null) { + function fetch(done) { + if (drained || aborted) { + return done(); + } else if (error) { return done(); } - if (err != null) { - if (continueOnError) { - aggregatedErrors.push(err); - } else { - error = err; - finalCallback(err); + + next(function(err, doc) { + if (error != null) { return done(); } - } - if (doc == null) { - drained = true; - if (handleResultsInProgress <= 0) { - const finalErr = continueOnError ? - createEachAsyncMultiError(aggregatedErrors) : - error; - - finalCallback(finalErr); - } else if (batchSize && documentsBatch.length) { - handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack); + if (err != null) { + if (err.name === 'MongoCursorExhaustedError') { + // We may end up calling `next()` multiple times on an exhausted + // cursor, which leads to an error. In case cursor is exhausted, + // just treat it as if the cursor returned no document, which is + // how a cursor indicates it is exhausted. + doc = null; + } else if (continueOnError) { + aggregatedErrors.push(err); + } else { + error = err; + finalCallback(err); + return done(); + } + } + if (doc == null) { + drained = true; + if (handleResultsInProgress <= 0) { + const finalErr = continueOnError ? + createEachAsyncMultiError(aggregatedErrors) : + error; + + finalCallback(finalErr); + } else if (batchSize && documentsBatch.length) { + handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack); + } + return done(); } - return done(); - } - ++handleResultsInProgress; + ++handleResultsInProgress; - // Kick off the subsequent `next()` before handling the result, but - // make sure we know that we still have a result to handle re: #8422 - immediate(() => done()); + // Kick off the subsequent `next()` before handling the result, but + // make sure we know that we still have a result to handle re: #8422 + immediate(() => done()); - if (batchSize) { - documentsBatch.push(doc); - } + if (batchSize) { + documentsBatch.push(doc); + } - // If the current documents size is less than the provided patch size don't process the documents yet - if (batchSize && documentsBatch.length !== batchSize) { - immediate(() => enqueue(fetch)); - return; - } + // If the current documents size is less than the provided batch size don't process the documents yet + if (batchSize && documentsBatch.length !== batchSize) { + immediate(() => enqueue(fetch)); + return; + } - const docsToProcess = batchSize ? documentsBatch : doc; + const docsToProcess = batchSize ? documentsBatch : doc; - function handleNextResultCallBack(err) { - if (batchSize) { - handleResultsInProgress -= documentsBatch.length; - documentsBatch = []; - } else { - --handleResultsInProgress; - } - if (err != null) { - if (continueOnError) { - aggregatedErrors.push(err); + function handleNextResultCallBack(err) { + if (batchSize) { + handleResultsInProgress -= documentsBatch.length; + documentsBatch = []; } else { - error = err; - return finalCallback(err); + --handleResultsInProgress; + } + if (err != null) { + if (continueOnError) { + aggregatedErrors.push(err); + } else { + error = err; + return finalCallback(err); + } + } + if ((drained || aborted) && handleResultsInProgress <= 0) { + const finalErr = continueOnError ? + createEachAsyncMultiError(aggregatedErrors) : + error; + return finalCallback(finalErr); } - } - if (drained && handleResultsInProgress <= 0) { - const finalErr = continueOnError ? - createEachAsyncMultiError(aggregatedErrors) : - error; - return finalCallback(finalErr); - } - immediate(() => enqueue(fetch)); - } + immediate(() => enqueue(fetch)); + } - handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack); - }); + handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack); + }); + } } } diff --git a/test/helpers/cursor.eachAsync.test.js b/test/helpers/cursor.eachAsync.test.js index b42cab85da0..57411ccfd5d 100644 --- a/test/helpers/cursor.eachAsync.test.js +++ b/test/helpers/cursor.eachAsync.test.js @@ -189,6 +189,30 @@ describe('eachAsync()', function() { assert.equal(numCalled, 1); }); + it('avoids mutating document batch with parallel (gh-12652)', async() => { + const max = 100; + let numCalled = 0; + function next(cb) { + setImmediate(() => { + if (++numCalled > max) { + return cb(null, null); + } + cb(null, { num: numCalled }); + }); + } + + let numDocsProcessed = 0; + async function fn(batch) { + numDocsProcessed += batch.length; + const length = batch.length; + await new Promise(resolve => setTimeout(resolve, 50)); + assert.equal(batch.length, length); + } + + await eachAsync(next, fn, { parallel: 7, batchSize: 10 }); + assert.equal(numDocsProcessed, max); + }); + it('using AbortSignal (gh-12173)', async function() { if (typeof AbortController === 'undefined') { return this.skip();