From 7d38aa606520334dc614c0509c130a6be93ec33e Mon Sep 17 00:00:00 2001 From: emadum Date: Thu, 30 Apr 2020 16:18:06 -0400 Subject: [PATCH 01/29] refactor: use maybePromise for ChangeStream.close --- lib/change_stream.js | 35 +++++++++++++---------------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 28e64d860b..20bf75b651 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -94,6 +94,8 @@ class ChangeStream extends EventEmitter { // Create contained Change Stream cursor this.cursor = createChangeStreamCursor(this, options); + this.closed = false; + // Listen for any `change` listeners being added to ChangeStream this.on('newListener', eventName => { if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) { @@ -158,7 +160,7 @@ class ChangeStream extends EventEmitter { if (this.cursor) { return this.cursor.isClosed(); } - return true; + return this.closed; } /** @@ -168,31 +170,21 @@ class ChangeStream extends EventEmitter { * @return {Promise} returns Promise if no callback passed */ close(callback) { - if (!this.cursor) { - if (callback) return callback(); - return this.promiseLibrary.resolve(); - } - - // Tidy up the existing cursor - const cursor = this.cursor; + return maybePromise(this.parent, callback, cb => { + this.closed = true; - if (callback) { - return cursor.close(err => { - ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); - delete this.cursor; + if (!this.cursor) { + return cb(); + } - return callback(err); - }); - } + // Tidy up the existing cursor + const cursor = this.cursor; - const PromiseCtor = this.promiseLibrary || Promise; - return new PromiseCtor((resolve, reject) => { - cursor.close(err => { + return cursor.close(err => { ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); delete this.cursor; - if (err) return reject(err); - resolve(); + return cb(err); }); }); } @@ -569,8 +561,7 @@ function processNewChange(args) { // Return the change if (eventEmitter) return changeStream.emit('change', change); - if (typeof callback === 'function') return callback(error, change); - return changeStream.promiseLibrary.resolve(change); + return callback(error, change); } /** From 89a162fe086616bbaecaf9fb889c53d4af4d2d30 Mon Sep 17 00:00:00 2001 From: emadum Date: Thu, 30 Apr 2020 16:18:23 -0400 Subject: [PATCH 02/29] test: fix skipped mid-close tests --- test/functional/change_stream.test.js | 31 ++++++++++++++------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 54df7c5ec9..bc9db21933 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1982,7 +1982,7 @@ describe('Change Streams', function() { }); // TODO: re-enable/fix these tests in NODE-2548 - describe.skip('should properly handle a changeStream event being processed mid-close', function() { + describe('should properly handle a changeStream event being processed mid-close', function() { let client, coll; function write() { @@ -2018,15 +2018,14 @@ describe('Change Streams', function() { .then(() => changeStream.next()) .then(() => changeStream.next()) .then(() => { - const nextP = changeStream.next(); - - return changeStream.close().then(() => nextP); + const nextP = () => changeStream.next(); + return changeStream.close().then(() => nextP()); }); } return Promise.all([read(), write()]).then( () => Promise.reject(new Error('Expected operation to fail with error')), - err => expect(err.message).to.equal('ChangeStream is closed') + err => expect(err.message).to.equal('ChangeStream is closed.') ); } }); @@ -2038,17 +2037,19 @@ describe('Change Streams', function() { changeStream.next(() => { changeStream.next(() => { - changeStream.next(err => { - let _err = null; - try { - expect(err.message).to.equal('ChangeStream is closed'); - } catch (e) { - _err = e; - } finally { - done(_err); - } + changeStream.close(err => { + expect(err).to.not.exist; + changeStream.next(err => { + let _err = null; + try { + expect(err.message).to.equal('ChangeStream is closed.'); + } catch (e) { + _err = e; + } finally { + done(_err); + } + }); }); - changeStream.close(); }); }); From 4dd387560795b8250fe07f1da75190c94d3b7fb9 Mon Sep 17 00:00:00 2001 From: emadum Date: Sat, 2 May 2020 20:59:17 -0400 Subject: [PATCH 03/29] clean up processNewChange --- lib/change_stream.js | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 20bf75b651..764a86f4d7 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -143,7 +143,7 @@ class ChangeStream extends EventEmitter { next(callback) { return maybePromise(this.parent, callback, cb => { if (this.isClosed()) { - return cb(new Error('Change Stream is not open.')); + return cb(new MongoError('ChangeStream is closed.')); } this.cursor.next((error, change) => { processNewChange({ changeStream: this, error, change, callback: cb }); @@ -475,11 +475,8 @@ function processNewChange(args) { if (eventEmitter) { return; } - - const error = new MongoError('ChangeStream is closed'); - return typeof callback === 'function' - ? callback(error, null) - : changeStream.promiseLibrary.reject(error); + callback(new MongoError('ChangeStream is closed.')); + return; } const topology = changeStream.topology; @@ -536,8 +533,7 @@ function processNewChange(args) { } if (eventEmitter) return changeStream.emit('error', error); - if (typeof callback === 'function') return callback(error, null); - return changeStream.promiseLibrary.reject(error); + return callback(error, null); } changeStream.attemptingResume = false; @@ -548,8 +544,7 @@ function processNewChange(args) { ); if (eventEmitter) return changeStream.emit('error', noResumeTokenError); - if (typeof callback === 'function') return callback(noResumeTokenError, null); - return changeStream.promiseLibrary.reject(noResumeTokenError); + return callback(noResumeTokenError, null); } // cache the resume token From 6a75b37f70e14c184f849c15223dafe1e2194ade Mon Sep 17 00:00:00 2001 From: emadum Date: Sat, 2 May 2020 21:23:16 -0400 Subject: [PATCH 04/29] clean up processNewChange some more --- lib/change_stream.js | 41 +++++++++++------------------------------ 1 file changed, 11 insertions(+), 30 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 764a86f4d7..8053b58f68 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -495,41 +495,22 @@ function processNewChange(args) { // close internal cursor, ignore errors changeStream.cursor.close(); - // attempt recreating the cursor - if (eventEmitter) { - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) { + waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { + if (err) { + changeStream.closed = true; + if (eventEmitter) { changeStream.emit('error', err); changeStream.emit('close'); return; } - changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - }); + return callback(err, null); + } - return; - } - - if (callback) { - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) return callback(err, null); - - changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - changeStream.next(callback); - }); - - return; - } - - return new Promise((resolve, reject) => { - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) return reject(err); - resolve(); - }); - }) - .then( - () => (changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions)) - ) - .then(() => changeStream.next()); + changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); + if (eventEmitter) return; + changeStream.next(callback); + }); + return; } if (eventEmitter) return changeStream.emit('error', error); From 67e9294d90bddb1df9fc48e1f00d62ab1f140f06 Mon Sep 17 00:00:00 2001 From: emadum Date: Sat, 2 May 2020 21:25:08 -0400 Subject: [PATCH 05/29] remove comment --- test/functional/change_stream.test.js | 1 - 1 file changed, 1 deletion(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index bc9db21933..49034ae45f 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1981,7 +1981,6 @@ describe('Change Streams', function() { } }); - // TODO: re-enable/fix these tests in NODE-2548 describe('should properly handle a changeStream event being processed mid-close', function() { let client, coll; From 35a552fea4515179939b1b563dbb3be354f3380a Mon Sep 17 00:00:00 2001 From: emadum Date: Sat, 2 May 2020 20:38:00 -0400 Subject: [PATCH 06/29] test: add waitForStarted to flaky changeStream tests --- test/functional/change_stream.test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 49034ae45f..e955833791 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -448,7 +448,7 @@ describe('Change Streams', function() { var thisChangeStream = theDatabase.collection('cacheResumeTokenCallback').watch(pipeline); // Trigger the first database event - setTimeout(() => { + waitForStarted(thisChangeStream, () => { theDatabase .collection('cacheResumeTokenCallback') .insert({ b: 2 }, function(err, result) { @@ -484,7 +484,7 @@ describe('Change Streams', function() { var theDatabase = client.db('integration_tests'); var thisChangeStream = theDatabase.collection('cacheResumeTokenPromise').watch(pipeline); - setTimeout(() => { + waitForStarted(thisChangeStream, () => { // Trigger the first database event theDatabase.collection('cacheResumeTokenPromise').insert({ b: 2 }, function(err, result) { assert.ifError(err); @@ -530,7 +530,7 @@ describe('Change Streams', function() { thisChangeStream.close().then(() => client.close(done)); }); - setTimeout(() => { + waitForStarted(thisChangeStream, () => { // Trigger the first database event theDatabase .collection('cacheResumeTokenListener') From 279f3c4e13026dc2b40a74fe958246a0befd7cc0 Mon Sep 17 00:00:00 2001 From: emadum Date: Sun, 3 May 2020 18:28:58 -0400 Subject: [PATCH 07/29] cleanup from review --- lib/change_stream.js | 29 +++++++++++---------------- test/functional/change_stream.test.js | 5 +++-- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 8053b58f68..1f21398fbf 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -143,7 +143,7 @@ class ChangeStream extends EventEmitter { next(callback) { return maybePromise(this.parent, callback, cb => { if (this.isClosed()) { - return cb(new MongoError('ChangeStream is closed.')); + return cb(new MongoError('ChangeStream is closed')); } this.cursor.next((error, change) => { processNewChange({ changeStream: this, error, change, callback: cb }); @@ -157,10 +157,7 @@ class ChangeStream extends EventEmitter { * @return {boolean} */ isClosed() { - if (this.cursor) { - return this.cursor.isClosed(); - } - return this.closed; + return (this.cursor && this.cursor.isClosed()) || this.closed; } /** @@ -171,18 +168,15 @@ class ChangeStream extends EventEmitter { */ close(callback) { return maybePromise(this.parent, callback, cb => { + if (this.closed) return cb(); this.closed = true; - if (!this.cursor) { - return cb(); - } - // Tidy up the existing cursor const cursor = this.cursor; return cursor.close(err => { ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); - delete this.cursor; + this.cursor = undefined; return cb(err); }); @@ -313,7 +307,7 @@ class ChangeStreamCursor extends Cursor { _initializeCursor(callback) { super._initializeCursor((err, result) => { if (err) { - callback(err, null); + callback(err); return; } @@ -339,7 +333,7 @@ class ChangeStreamCursor extends Cursor { _getMore(callback) { super._getMore((err, response) => { if (err) { - callback(err, null); + callback(err); return; } @@ -452,7 +446,7 @@ function waitForTopologyConnected(topology, options, callback) { const timeout = options.timeout || SELECTION_TIMEOUT; const readPreference = options.readPreference; - if (topology.isConnected({ readPreference })) return callback(null, null); + if (topology.isConnected({ readPreference })) return callback(); const hrElapsed = process.hrtime(start); const elapsed = (hrElapsed[0] * 1e9 + hrElapsed[1]) / 1e6; if (elapsed > timeout) return callback(new MongoError('Timed out waiting for connection')); @@ -472,10 +466,11 @@ function processNewChange(args) { // If the cursor is null, then it should not process a change. if (cursor == null) { // We do not error in the eventEmitter case. + changeStream.closed = true; if (eventEmitter) { return; } - callback(new MongoError('ChangeStream is closed.')); + callback(new MongoError('ChangeStream is closed')); return; } @@ -503,7 +498,7 @@ function processNewChange(args) { changeStream.emit('close'); return; } - return callback(err, null); + return callback(err); } changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); @@ -514,7 +509,7 @@ function processNewChange(args) { } if (eventEmitter) return changeStream.emit('error', error); - return callback(error, null); + return callback(error); } changeStream.attemptingResume = false; @@ -525,7 +520,7 @@ function processNewChange(args) { ); if (eventEmitter) return changeStream.emit('error', noResumeTokenError); - return callback(noResumeTokenError, null); + return callback(noResumeTokenError); } // cache the resume token diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index e955833791..a2c5aad6d5 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -707,6 +707,7 @@ describe('Change Streams', function() { function completeStream() { changeStream.hasNext(function(err, hasNext) { + expect(err).to.not.exist; assert.equal(hasNext, false); assert.equal(changeStream.isClosed(), true); client.close(done); @@ -2024,7 +2025,7 @@ describe('Change Streams', function() { return Promise.all([read(), write()]).then( () => Promise.reject(new Error('Expected operation to fail with error')), - err => expect(err.message).to.equal('ChangeStream is closed.') + err => expect(err.message).to.equal('ChangeStream is closed') ); } }); @@ -2041,7 +2042,7 @@ describe('Change Streams', function() { changeStream.next(err => { let _err = null; try { - expect(err.message).to.equal('ChangeStream is closed.'); + expect(err.message).to.equal('ChangeStream is closed'); } catch (e) { _err = e; } finally { From 182a03d7364fe3020bf857d72025c1406821dc41 Mon Sep 17 00:00:00 2001 From: emadum Date: Sun, 3 May 2020 19:36:04 -0400 Subject: [PATCH 08/29] properly fix mid-close tests --- test/functional/change_stream.test.js | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index a2c5aad6d5..a83979f12b 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -2017,10 +2017,7 @@ describe('Change Streams', function() { return Promise.resolve() .then(() => changeStream.next()) .then(() => changeStream.next()) - .then(() => { - const nextP = () => changeStream.next(); - return changeStream.close().then(() => nextP()); - }); + .then(() => Promise.all([changeStream.close(), changeStream.next()])); } return Promise.all([read(), write()]).then( @@ -2037,18 +2034,16 @@ describe('Change Streams', function() { changeStream.next(() => { changeStream.next(() => { - changeStream.close(err => { - expect(err).to.not.exist; - changeStream.next(err => { - let _err = null; - try { - expect(err.message).to.equal('ChangeStream is closed'); - } catch (e) { - _err = e; - } finally { - done(_err); - } - }); + changeStream.close(); + changeStream.next(err => { + let _err = null; + try { + expect(err.message).to.equal('ChangeStream is closed'); + } catch (e) { + _err = e; + } finally { + done(_err); + } }); }); }); From fd01b1cf0bb55ebaf4c3be8008fde3116191b60e Mon Sep 17 00:00:00 2001 From: emadum Date: Mon, 4 May 2020 11:30:22 -0400 Subject: [PATCH 09/29] use waitForStarted in flaky tests --- test/functional/change_stream.test.js | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index a83979f12b..ed242a9025 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -158,7 +158,7 @@ describe('Change Streams', function() { close(err); }); }); - setTimeout(() => coll.insertOne({ x: 1 })); + waitForStarted(changeStream, () => coll.insertOne({ x: 1 })); changeStream.on('error', err => close(err)); }); } @@ -177,17 +177,6 @@ describe('Change Streams', function() { const collection = client.db('integration_tests').collection('docsDataEvent'); const changeStream = collection.watch(pipeline); - changeStream.cursor.once('response', () => { - // Trigger the first database event - collection.insertOne({ d: 4 }, function(err) { - assert.ifError(err); - // Trigger the second database event - collection.updateOne({ d: 4 }, { $inc: { d: 2 } }, function(err) { - assert.ifError(err); - }); - }); - }); - let count = 0; const cleanup = _err => { @@ -224,6 +213,17 @@ describe('Change Streams', function() { cleanup(e); } }); + + waitForStarted(changeStream, () => { + // Trigger the first database event + collection.insertOne({ d: 4 }, function(err) { + assert.ifError(err); + // Trigger the second database event + collection.updateOne({ d: 4 }, { $inc: { d: 2 } }, function(err) { + assert.ifError(err); + }); + }); + }); }); } }); From 972a6ed6ff7fe5fc52452e700f32f906da686f5b Mon Sep 17 00:00:00 2001 From: emadum Date: Mon, 4 May 2020 11:47:50 -0400 Subject: [PATCH 10/29] use waitForStarted where possible in existing tests --- test/functional/change_stream.test.js | 28 +++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index ed242a9025..bf5a6704a7 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -562,14 +562,14 @@ describe('Change Streams', function() { .watch([{ $project: { _id: false } }]); // Trigger the first database event - setTimeout(() => { + waitForStarted(thisChangeStream, () => { theDatabase .collection('resumetokenProjectedOutCallback') .insert({ b: 2 }, function(err, result) { expect(err).to.not.exist; expect(result.insertedCount).to.equal(1); }); - }, 250); + }); // Fetch the change notification thisChangeStream.next(function(err) { @@ -610,7 +610,7 @@ describe('Change Streams', function() { }); // Trigger the first database event - setTimeout(() => { + waitForStarted(thisChangeStream, () => { theDatabase .collection('resumetokenProjectedOutListener') .insert({ b: 2 }, function(err, result) { @@ -669,7 +669,7 @@ describe('Change Streams', function() { }); // Trigger the first database event - setTimeout(() => { + waitForStarted(changeStream, () => { database.collection('invalidateListeners').insert({ a: 1 }, function(err) { assert.ifError(err); }); @@ -693,7 +693,7 @@ describe('Change Streams', function() { var changeStream = database.collection('invalidateCallback').watch(pipeline); // Trigger the first database event - setTimeout(() => { + waitForStarted(changeStream, () => { database.collection('invalidateCallback').insert({ a: 1 }, function(err) { assert.ifError(err); }); @@ -758,14 +758,14 @@ describe('Change Streams', function() { var changeStream = database.collection('invalidateCollectionDropPromises').watch(pipeline); // Trigger the first database event - setTimeout(() => { + waitForStarted(changeStream, () => { return database .collection('invalidateCollectionDropPromises') .insert({ a: 1 }) .then(function() { return delay(200); }); - }, 200); + }); return changeStream .next() @@ -1139,7 +1139,7 @@ describe('Change Streams', function() { // Trigger the first database event firstChangeStream = collection.watch(pipeline); - setTimeout(() => { + waitForStarted(firstChangeStream, () => { return collection .insert(docs[0]) .then(function(result) { @@ -1224,7 +1224,7 @@ describe('Change Streams', function() { fullDocument: 'updateLookup' }); - setTimeout(() => { + waitForStarted(changeStream, () => { return collection.insert({ f: 128 }).then(function(result) { assert.equal(result.insertedCount, 1); }); @@ -1280,7 +1280,7 @@ describe('Change Streams', function() { }); // Trigger the first database event - setTimeout(() => { + waitForStarted(changeStream, () => { return collection .insert({ i: 128 }) .then(function(result) { @@ -1417,7 +1417,7 @@ describe('Change Streams', function() { }) .on('error', close); - setTimeout(() => { + waitForStarted(thisChangeStream, () => { theCollection.insert({ a: 1 }, function(err) { assert.ifError(err); }); @@ -1632,7 +1632,7 @@ describe('Change Streams', function() { done(err); }); - setTimeout(() => { + waitForStarted(thisChangeStream, () => { theCollection.insert({ a: 1407 }, function(err) { if (err) done(err); }); @@ -1972,7 +1972,7 @@ describe('Change Streams', function() { changeStream.on('close', closeSpy); // Trigger the first database event - setTimeout(() => { + waitForStarted(changeStream, () => { coll.insertOne({ a: 1 }, (err, result) => { expect(err).to.not.exist; expect(result.insertedCount).to.equal(1); @@ -2078,7 +2078,7 @@ describe('Change Streams', function() { }); changeStream.on('error', err => close(err)); - setTimeout(() => write().catch(() => {})); + waitForStarted(changeStream, () => write().catch(() => {})); } }); }); From b0a0fb84abb1a0ae6e3f4af096cd89776daedb08 Mon Sep 17 00:00:00 2001 From: emadum Date: Mon, 4 May 2020 12:09:12 -0400 Subject: [PATCH 11/29] improve cleanup logic for mid-close tests --- test/functional/change_stream.test.js | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index bf5a6704a7..e086a33b8c 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1983,7 +1983,7 @@ describe('Change Streams', function() { }); describe('should properly handle a changeStream event being processed mid-close', function() { - let client, coll; + let client, coll, changeStream; function write() { return Promise.resolve() @@ -1997,23 +1997,33 @@ describe('Change Streams', function() { return client.connect().then(_client => { client = _client; coll = client.db(this.configuration.db).collection('tester'); + changeStream = coll.watch(); }); }); afterEach(function() { - coll = undefined; - if (client) { - return client.close().then(() => { + return Promise.resolve() + .then(() => { + if (changeStream && !changeStream.isClosed()) { + return changeStream.close(); + } + }) + .then(() => { + if (client) { + client.close(); + } + }) + .then(() => { + coll = undefined; + changeStream = undefined; client = undefined; }); - } }); it('when invoked with promises', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function() { function read() { - const changeStream = coll.watch(); return Promise.resolve() .then(() => changeStream.next()) .then(() => changeStream.next()) @@ -2030,8 +2040,6 @@ describe('Change Streams', function() { it('when invoked with callbacks', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { - const changeStream = coll.watch(); - changeStream.next(() => { changeStream.next(() => { changeStream.close(); @@ -2064,8 +2072,6 @@ describe('Change Streams', function() { return done(_err); }; - const changeStream = coll.watch(); - let counter = 0; changeStream.on('change', () => { counter += 1; From 395a9bc8cc5b61e8de7fa1f07882dde402e518dc Mon Sep 17 00:00:00 2001 From: emadum Date: Mon, 4 May 2020 12:43:12 -0400 Subject: [PATCH 12/29] use callback so client will close even if changeStream.close errors --- test/functional/change_stream.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index e086a33b8c..649bcb42f7 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -576,7 +576,7 @@ describe('Change Streams', function() { expect(err).to.exist; // Close the change stream - thisChangeStream.close().then(() => client.close(done)); + thisChangeStream.close(client.close(done)); }); }); } From 3de4d8367fbdc923733e653f6c1e01dfa37f9a1e Mon Sep 17 00:00:00 2001 From: emadum Date: Mon, 4 May 2020 14:03:22 -0400 Subject: [PATCH 13/29] fix typo --- test/functional/change_stream.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 649bcb42f7..ac3772eeff 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -576,7 +576,7 @@ describe('Change Streams', function() { expect(err).to.exist; // Close the change stream - thisChangeStream.close(client.close(done)); + thisChangeStream.close(() => client.close(done)); }); }); } From 5e1dc831d54fddeac97d4bb7969915f784fd9d8d Mon Sep 17 00:00:00 2001 From: emadum Date: Mon, 4 May 2020 17:25:13 -0400 Subject: [PATCH 14/29] revert changes and skip mid-close tests --- test/functional/change_stream.test.js | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index ac3772eeff..e75121b442 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -2020,14 +2020,18 @@ describe('Change Streams', function() { }); }); - it('when invoked with promises', { + it.skip('when invoked with promises', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function() { function read() { return Promise.resolve() .then(() => changeStream.next()) .then(() => changeStream.next()) - .then(() => Promise.all([changeStream.close(), changeStream.next()])); + .then(() => { + const nextP = changeStream.next(); + + return changeStream.close().then(() => nextP); + }); } return Promise.all([read(), write()]).then( @@ -2037,12 +2041,11 @@ describe('Change Streams', function() { } }); - it('when invoked with callbacks', { + it.skip('when invoked with callbacks', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { changeStream.next(() => { changeStream.next(() => { - changeStream.close(); changeStream.next(err => { let _err = null; try { @@ -2053,6 +2056,7 @@ describe('Change Streams', function() { done(_err); } }); + changeStream.close(); }); }); From ad22a9f7e82ee7a570eec705c0a3ee39a7a76552 Mon Sep 17 00:00:00 2001 From: emadum Date: Mon, 4 May 2020 22:25:06 -0400 Subject: [PATCH 15/29] test fix --- test/functional/change_stream.test.js | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index e75121b442..5d95f2e075 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1945,7 +1945,7 @@ describe('Change Streams', function() { test: function(done) { const configuration = this.configuration; const client = configuration.newClient(); - const closeSpy = sinon.spy(); + const errorSpy = sinon.spy(); client.connect(function(err, client) { expect(err).to.not.exist; @@ -1961,15 +1961,12 @@ describe('Change Streams', function() { expect(changeDoc).to.be.null; }); - changeStream.on('error', err => { - expect(err).to.exist; - changeStream.close(() => { - expect(closeSpy.calledOnce).to.be.true; - client.close(done); - }); - }); + changeStream.on('error', errorSpy); - changeStream.on('close', closeSpy); + changeStream.on('close', () => { + expect(errorSpy.calledOnce).to.be.true; + client.close(done); + }); // Trigger the first database event waitForStarted(changeStream, () => { From 25609ecfc397935b4c3e6ca61c58dc5470e9b26b Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 5 May 2020 07:39:37 -0400 Subject: [PATCH 16/29] refactor: clean up tryNext tests, add withTempDb helper --- test/functional/change_stream.test.js | 47 +++++++++++++-------------- test/functional/shared.js | 12 +++++++ 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 5d95f2e075..6e4ce4aec9 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -4,6 +4,7 @@ var Transform = require('stream').Transform; const MongoError = require('../../lib/core').MongoError; var MongoNetworkError = require('../../lib/core').MongoNetworkError; var setupDatabase = require('./shared').setupDatabase; +var withTempDb = require('./shared').withTempDb; var delay = require('./shared').delay; var co = require('co'); var mock = require('mongodb-mock-server'); @@ -54,9 +55,12 @@ function tryNext(changeStream, callback) { let complete = false; function done(err, result) { if (complete) return; - // if the arity is 1 then this a callback for `more` if (arguments.length === 1) { + if (err instanceof Error) { + callback(err); + return; + } result = err; const batch = result.cursor.firstBatch || result.cursor.nextBatch; if (batch.length === 0) { @@ -1940,12 +1944,12 @@ describe('Change Streams', function() { .then(() => teardown(), teardown); }); - it('should emit close event after error event', { + it.only('should emit close event after error event', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { const configuration = this.configuration; const client = configuration.newClient(); - const errorSpy = sinon.spy(); + const closeSpy = sinon.spy(); client.connect(function(err, client) { expect(err).to.not.exist; @@ -1961,11 +1965,14 @@ describe('Change Streams', function() { expect(changeDoc).to.be.null; }); - changeStream.on('error', errorSpy); + changeStream.on('close', closeSpy); - changeStream.on('close', () => { - expect(errorSpy.calledOnce).to.be.true; - client.close(done); + changeStream.on('error', err => { + expect(err).to.exist; + changeStream.close(() => { + expect(closeSpy.calledOnce).to.be.true; + client.close(done); + }); }); // Trigger the first database event @@ -2612,21 +2619,14 @@ describe('Change Streams', function() { describe('tryNext', function() { it('should return null on single iteration of empty cursor', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function(done) { - const client = this.configuration.newClient(); - client.connect(err => { - expect(err).to.not.exist; - - const changeStream = client - .db() - .collection('test') - .watch(); - + test: function() { + return withTempDb('testTryNext', this.configuration.newClient(), db => done => { + const changeStream = db.collection('test').watch(); tryNext(changeStream, (err, doc) => { expect(err).to.not.exist; expect(doc).to.not.exist; - changeStream.close(() => client.close(done)); + changeStream.close(done); }); }); } @@ -2634,12 +2634,9 @@ describe('Change Streams', function() { it('should iterate a change stream until first empty batch', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function(done) { - const client = this.configuration.newClient(); - client.connect(err => { - expect(err).to.not.exist; - - const collection = client.db().collection('test'); + test: function() { + return withTempDb('testTryNext', this.configuration.newClient(), db => done => { + const collection = db.collection('test'); const changeStream = collection.watch(); waitForStarted(changeStream, () => { collection.insertOne({ a: 42 }, err => { @@ -2663,7 +2660,7 @@ describe('Change Streams', function() { expect(err).to.not.exist; expect(doc).to.not.exist; - changeStream.close(() => client.close(done)); + changeStream.close(done); }); }); }); diff --git a/test/functional/shared.js b/test/functional/shared.js index 2a68d1b89c..16e1a48863 100644 --- a/test/functional/shared.js +++ b/test/functional/shared.js @@ -75,6 +75,17 @@ function makeCleanupFn(client) { }; } +function withTempDb(dbName, client, operation, errorHandler) { + return withClient( + client, + client => done => { + const db = client.db(dbName); + operation(db)(() => db.dropDatabase(done)); + }, + errorHandler + ); +} + function withClient(client, operation, errorHandler) { const cleanup = makeCleanupFn(client); @@ -198,6 +209,7 @@ module.exports = { assert, delay, withClient, + withTempDb, filterForCommands, filterOutCommands, ignoreNsNotFound, From 891544d4c4af14b30b3c46446284b14f878ef21c Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 5 May 2020 07:40:55 -0400 Subject: [PATCH 17/29] remove .only --- test/functional/change_stream.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 6e4ce4aec9..209b0042d7 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1944,7 +1944,7 @@ describe('Change Streams', function() { .then(() => teardown(), teardown); }); - it.only('should emit close event after error event', { + it('should emit close event after error event', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { const configuration = this.configuration; From 44ff554721b12553fc4ecc6d8c5124ba16619de7 Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 5 May 2020 09:18:30 -0400 Subject: [PATCH 18/29] fix: dont processNewChange for closed changeStream --- lib/change_stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 1f21398fbf..56bbfcf81e 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -464,7 +464,7 @@ function processNewChange(args) { const cursor = changeStream.cursor; // If the cursor is null, then it should not process a change. - if (cursor == null) { + if (cursor == null || changeStream.closed) { // We do not error in the eventEmitter case. changeStream.closed = true; if (eventEmitter) { From e488d63d0e389658228e556758b58bca10140dd6 Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 5 May 2020 09:18:49 -0400 Subject: [PATCH 19/29] fix broken mid-close tests, again --- test/functional/change_stream.test.js | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 209b0042d7..ab3eeb018d 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1992,8 +1992,11 @@ describe('Change Streams', function() { function write() { return Promise.resolve() .then(() => coll.insertOne({ a: 1 })) - .then(() => coll.insertOne({ b: 2 })) - .then(() => coll.insertOne({ c: 3 })); + .then(() => coll.insertOne({ b: 2 })); + } + + function lastWrite() { + return coll.insertOne({ c: 3 }); } beforeEach(function() { @@ -2014,7 +2017,7 @@ describe('Change Streams', function() { }) .then(() => { if (client) { - client.close(); + return client.close(); } }) .then(() => { @@ -2024,7 +2027,7 @@ describe('Change Streams', function() { }); }); - it.skip('when invoked with promises', { + it('when invoked with promises', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function() { function read() { @@ -2032,6 +2035,7 @@ describe('Change Streams', function() { .then(() => changeStream.next()) .then(() => changeStream.next()) .then(() => { + lastWrite(); const nextP = changeStream.next(); return changeStream.close().then(() => nextP); @@ -2045,15 +2049,18 @@ describe('Change Streams', function() { } }); - it.skip('when invoked with callbacks', { + it('when invoked with callbacks', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { changeStream.next(() => { changeStream.next(() => { + lastWrite(); changeStream.next(err => { let _err = null; try { - expect(err.message).to.equal('ChangeStream is closed'); + expect(err) + .property('message') + .to.equal('ChangeStream is closed'); } catch (e) { _err = e; } finally { From e5b780286ec044521ad7472a74c84c0f7f7664c9 Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 5 May 2020 11:13:55 -0400 Subject: [PATCH 20/29] fix todo in forbidden stage test --- test/functional/change_stream.test.js | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index ab3eeb018d..75cbdc914b 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -421,15 +421,19 @@ describe('Change Streams', function() { client.connect(function(err, client) { assert.ifError(err); + const forbiddenStage = {}; + const forbiddenStageName = '$alksdjfhlaskdfjh'; + forbiddenStage[forbiddenStageName] = 2; + var theDatabase = client.db('integration_tests'); - var changeStream = theDatabase - .collection('forbiddenStageTest') - .watch([{ $alksdjfhlaskdfjh: 2 }]); + var changeStream = theDatabase.collection('forbiddenStageTest').watch([forbiddenStage]); changeStream.next(function(err) { assert.ok(err); assert.ok(err.message); - // assert.ok(err.message.indexOf('SOME ERROR MESSAGE HERE ONCE SERVER-29137 IS DONE') > -1); + assert.ok( + err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1 + ); changeStream.close(err => client.close(cerr => done(err || cerr))); }); }); From c7c5bc5593bb8203f1486e55455f4d6f6e21646d Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 5 May 2020 16:08:57 -0400 Subject: [PATCH 21/29] cleanup --- lib/change_stream.js | 7 +++++-- test/functional/shared.js | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 56bbfcf81e..9b48769ddb 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -157,7 +157,7 @@ class ChangeStream extends EventEmitter { * @return {boolean} */ isClosed() { - return (this.cursor && this.cursor.isClosed()) || this.closed; + return this.closed || (this.cursor && this.cursor.isClosed()); } /** @@ -169,6 +169,8 @@ class ChangeStream extends EventEmitter { close(callback) { return maybePromise(this.parent, callback, cb => { if (this.closed) return cb(); + + // flag the change stream as explicitly closed this.closed = true; // Tidy up the existing cursor @@ -463,7 +465,7 @@ function processNewChange(args) { const eventEmitter = args.eventEmitter || false; const cursor = changeStream.cursor; - // If the cursor is null, then it should not process a change. + // If the cursor is null or the change stream has been closed explictly, do not process a change. if (cursor == null || changeStream.closed) { // We do not error in the eventEmitter case. changeStream.closed = true; @@ -492,6 +494,7 @@ function processNewChange(args) { waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { if (err) { + // if there's an error reconnecting, close the change stream changeStream.closed = true; if (eventEmitter) { changeStream.emit('error', err); diff --git a/test/functional/shared.js b/test/functional/shared.js index 16e1a48863..04c506d6ac 100644 --- a/test/functional/shared.js +++ b/test/functional/shared.js @@ -80,7 +80,7 @@ function withTempDb(dbName, client, operation, errorHandler) { client, client => done => { const db = client.db(dbName); - operation(db)(() => db.dropDatabase(done)); + operation.call(this, db)(() => db.dropDatabase(done)); }, errorHandler ); From b97befe4c10c49fbc2e0cb16542b8e77cd9feb5d Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 5 May 2020 16:11:35 -0400 Subject: [PATCH 22/29] add timeout to waitForStarted --- test/functional/change_stream.test.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 75cbdc914b..f41999ef1f 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -38,7 +38,11 @@ function triggerResumableError(changeStream, onCursorClosed) { * @param {function} callback */ function waitForStarted(changeStream, callback) { + const timeout = setTimeout(() => { + throw new Error('Change stream never started'); + }, 2000); changeStream.cursor.once('init', () => { + clearTimeout(timeout); callback(); }); } From 70bdeba2bdc91bd2099e1291b231fc498d8066a8 Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 5 May 2020 16:41:07 -0400 Subject: [PATCH 23/29] ensure parity on mid-close events test --- test/functional/change_stream.test.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index f41999ef1f..9641db74cf 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -2107,7 +2107,11 @@ describe('Change Streams', function() { }); changeStream.on('error', err => close(err)); - waitForStarted(changeStream, () => write().catch(() => {})); + waitForStarted(changeStream, () => + write() + .then(() => lastWrite()) + .catch(() => {}) + ); } }); }); From 8619634a30743144a769736da473019f9b00e376 Mon Sep 17 00:00:00 2001 From: emadum Date: Wed, 6 May 2020 11:24:07 -0400 Subject: [PATCH 24/29] spend less time waiting for topology --- lib/change_stream.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 9b48769ddb..9f18f075c7 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -6,6 +6,7 @@ const MongoError = require('./core').MongoError; const Cursor = require('./cursor'); const relayEvents = require('./core/utils').relayEvents; const maxWireVersion = require('./core/utils').maxWireVersion; +const isUnifiedTopology = require('./core/utils').isUnifiedTopology; const maybePromise = require('./utils').maybePromise; const AggregateOperation = require('./operations/aggregate'); @@ -442,6 +443,7 @@ function applyKnownOptions(target, source, optionNames) { // ChangeStream resumability until the new SDAM layer can be used. const SELECTION_TIMEOUT = 30000; function waitForTopologyConnected(topology, options, callback) { + if (isUnifiedTopology(topology)) return callback(); setTimeout(() => { if (options && options.start == null) options.start = process.hrtime(); const start = options.start || process.hrtime(); @@ -453,7 +455,7 @@ function waitForTopologyConnected(topology, options, callback) { const elapsed = (hrElapsed[0] * 1e9 + hrElapsed[1]) / 1e6; if (elapsed > timeout) return callback(new MongoError('Timed out waiting for connection')); waitForTopologyConnected(topology, options, callback); - }, 3000); // this is an arbitrary wait time to allow SDAM to transition + }, 500); // this is an arbitrary wait time to allow SDAM to transition } // Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream. From 29dedd216e07dd1e9e2a59aabf30c848c5669521 Mon Sep 17 00:00:00 2001 From: emadum Date: Wed, 6 May 2020 11:54:15 -0400 Subject: [PATCH 25/29] tryNext tests should use w: majority --- test/functional/change_stream.test.js | 60 ++++++++++++++++----------- test/functional/shared.js | 4 +- 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 9641db74cf..83d4cf0049 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -2639,37 +2639,42 @@ describe('Change Streams', function() { it('should return null on single iteration of empty cursor', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function() { - return withTempDb('testTryNext', this.configuration.newClient(), db => done => { - const changeStream = db.collection('test').watch(); - tryNext(changeStream, (err, doc) => { - expect(err).to.not.exist; - expect(doc).to.not.exist; + return withTempDb( + 'testTryNext', + { w: 'majority' }, + this.configuration.newClient(), + db => done => { + const changeStream = db.collection('test').watch(); + tryNext(changeStream, (err, doc) => { + expect(err).to.not.exist; + expect(doc).to.not.exist; - changeStream.close(done); - }); - }); + changeStream.close(done); + }); + } + ); } }); it('should iterate a change stream until first empty batch', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function() { - return withTempDb('testTryNext', this.configuration.newClient(), db => done => { - const collection = db.collection('test'); - const changeStream = collection.watch(); - waitForStarted(changeStream, () => { - collection.insertOne({ a: 42 }, err => { - expect(err).to.not.exist; - - collection.insertOne({ b: 24 }, err => { + return withTempDb( + 'testTryNext', + { w: 'majority' }, + this.configuration.newClient(), + db => done => { + const collection = db.collection('test'); + const changeStream = collection.watch(); + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; + + collection.insertOne({ b: 24 }, err => { + expect(err).to.not.exist; + }); }); }); - }); - - tryNext(changeStream, (err, doc) => { - expect(err).to.not.exist; - expect(doc).to.exist; tryNext(changeStream, (err, doc) => { expect(err).to.not.exist; @@ -2677,13 +2682,18 @@ describe('Change Streams', function() { tryNext(changeStream, (err, doc) => { expect(err).to.not.exist; - expect(doc).to.not.exist; + expect(doc).to.exist; + + tryNext(changeStream, (err, doc) => { + expect(err).to.not.exist; + expect(doc).to.not.exist; - changeStream.close(done); + changeStream.close(done); + }); }); }); - }); - }); + } + ); } }); }); diff --git a/test/functional/shared.js b/test/functional/shared.js index 04c506d6ac..106f234742 100644 --- a/test/functional/shared.js +++ b/test/functional/shared.js @@ -75,11 +75,11 @@ function makeCleanupFn(client) { }; } -function withTempDb(dbName, client, operation, errorHandler) { +function withTempDb(name, options, client, operation, errorHandler) { return withClient( client, client => done => { - const db = client.db(dbName); + const db = client.db(name, options); operation.call(this, db)(() => db.dropDatabase(done)); }, errorHandler From 0305e3268a13ccbe159968da6d31bbc3726a4791 Mon Sep 17 00:00:00 2001 From: emadum Date: Wed, 6 May 2020 11:54:33 -0400 Subject: [PATCH 26/29] remove old code --- test/functional/change_stream.test.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 83d4cf0049..020de3d6c4 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -61,10 +61,6 @@ function tryNext(changeStream, callback) { if (complete) return; // if the arity is 1 then this a callback for `more` if (arguments.length === 1) { - if (err instanceof Error) { - callback(err); - return; - } result = err; const batch = result.cursor.firstBatch || result.cursor.nextBatch; if (batch.length === 0) { From 4cbd81cc127c756bcb46283609eb7efb9a336d9f Mon Sep 17 00:00:00 2001 From: emadum Date: Wed, 6 May 2020 11:54:57 -0400 Subject: [PATCH 27/29] dont skip waitForTopology on unified topology --- lib/change_stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 9f18f075c7..6d7ff0530a 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -443,7 +443,7 @@ function applyKnownOptions(target, source, optionNames) { // ChangeStream resumability until the new SDAM layer can be used. const SELECTION_TIMEOUT = 30000; function waitForTopologyConnected(topology, options, callback) { - if (isUnifiedTopology(topology)) return callback(); + // if (isUnifiedTopology(topology)) return callback(); setTimeout(() => { if (options && options.start == null) options.start = process.hrtime(); const start = options.start || process.hrtime(); From ffa0b389f8f8396914f8deed79e437168bf0f5d0 Mon Sep 17 00:00:00 2001 From: emadum Date: Wed, 6 May 2020 12:02:04 -0400 Subject: [PATCH 28/29] lint error --- lib/change_stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 6d7ff0530a..d37271adfd 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -6,7 +6,7 @@ const MongoError = require('./core').MongoError; const Cursor = require('./cursor'); const relayEvents = require('./core/utils').relayEvents; const maxWireVersion = require('./core/utils').maxWireVersion; -const isUnifiedTopology = require('./core/utils').isUnifiedTopology; +// const isUnifiedTopology = require('./core/utils').isUnifiedTopology; const maybePromise = require('./utils').maybePromise; const AggregateOperation = require('./operations/aggregate'); From 6e8d0d6c579f08dfa6a0b153af84cba56f81c094 Mon Sep 17 00:00:00 2001 From: emadum Date: Wed, 6 May 2020 17:07:36 -0400 Subject: [PATCH 29/29] clean up commented code --- lib/change_stream.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index d37271adfd..635d009ec2 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -6,7 +6,6 @@ const MongoError = require('./core').MongoError; const Cursor = require('./cursor'); const relayEvents = require('./core/utils').relayEvents; const maxWireVersion = require('./core/utils').maxWireVersion; -// const isUnifiedTopology = require('./core/utils').isUnifiedTopology; const maybePromise = require('./utils').maybePromise; const AggregateOperation = require('./operations/aggregate'); @@ -443,7 +442,6 @@ function applyKnownOptions(target, source, optionNames) { // ChangeStream resumability until the new SDAM layer can be used. const SELECTION_TIMEOUT = 30000; function waitForTopologyConnected(topology, options, callback) { - // if (isUnifiedTopology(topology)) return callback(); setTimeout(() => { if (options && options.start == null) options.start = process.hrtime(); const start = options.start || process.hrtime();