diff --git a/lib/change_stream.js b/lib/change_stream.js index 28e64d860b..635d009ec2 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -94,6 +94,8 @@ class ChangeStream extends EventEmitter { // Create contained Change Stream cursor this.cursor = createChangeStreamCursor(this, options); + this.closed = false; + // Listen for any `change` listeners being added to ChangeStream this.on('newListener', eventName => { if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) { @@ -141,7 +143,7 @@ class ChangeStream extends EventEmitter { next(callback) { return maybePromise(this.parent, callback, cb => { if (this.isClosed()) { - return cb(new Error('Change Stream is not open.')); + return cb(new MongoError('ChangeStream is closed')); } this.cursor.next((error, change) => { processNewChange({ changeStream: this, error, change, callback: cb }); @@ -155,10 +157,7 @@ class ChangeStream extends EventEmitter { * @return {boolean} */ isClosed() { - if (this.cursor) { - return this.cursor.isClosed(); - } - return true; + return this.closed || (this.cursor && this.cursor.isClosed()); } /** @@ -168,31 +167,20 @@ class ChangeStream extends EventEmitter { * @return {Promise} returns Promise if no callback passed */ close(callback) { - if (!this.cursor) { - if (callback) return callback(); - return this.promiseLibrary.resolve(); - } + return maybePromise(this.parent, callback, cb => { + if (this.closed) return cb(); - // Tidy up the existing cursor - const cursor = this.cursor; + // flag the change stream as explicitly closed + this.closed = true; - if (callback) { - return cursor.close(err => { - ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); - delete this.cursor; + // Tidy up the existing cursor + const cursor = this.cursor; - return callback(err); - }); - } - - const PromiseCtor = this.promiseLibrary || Promise; - return new PromiseCtor((resolve, reject) => { - cursor.close(err => { + return cursor.close(err => { ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); - delete this.cursor; + this.cursor = undefined; - if (err) return reject(err); - resolve(); + return cb(err); }); }); } @@ -321,7 +309,7 @@ class ChangeStreamCursor extends Cursor { _initializeCursor(callback) { super._initializeCursor((err, result) => { if (err) { - callback(err, null); + callback(err); return; } @@ -347,7 +335,7 @@ class ChangeStreamCursor extends Cursor { _getMore(callback) { super._getMore((err, response) => { if (err) { - callback(err, null); + callback(err); return; } @@ -460,12 +448,12 @@ function waitForTopologyConnected(topology, options, callback) { const timeout = options.timeout || SELECTION_TIMEOUT; const readPreference = options.readPreference; - if (topology.isConnected({ readPreference })) return callback(null, null); + if (topology.isConnected({ readPreference })) return callback(); const hrElapsed = process.hrtime(start); const elapsed = (hrElapsed[0] * 1e9 + hrElapsed[1]) / 1e6; if (elapsed > timeout) return callback(new MongoError('Timed out waiting for connection')); waitForTopologyConnected(topology, options, callback); - }, 3000); // this is an arbitrary wait time to allow SDAM to transition + }, 500); // this is an arbitrary wait time to allow SDAM to transition } // Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream. @@ -477,17 +465,15 @@ function processNewChange(args) { const eventEmitter = args.eventEmitter || false; const cursor = changeStream.cursor; - // If the cursor is null, then it should not process a change. - if (cursor == null) { + // If the cursor is null or the change stream has been closed explictly, do not process a change. + if (cursor == null || changeStream.closed) { // We do not error in the eventEmitter case. + changeStream.closed = true; if (eventEmitter) { return; } - - const error = new MongoError('ChangeStream is closed'); - return typeof callback === 'function' - ? callback(error, null) - : changeStream.promiseLibrary.reject(error); + callback(new MongoError('ChangeStream is closed')); + return; } const topology = changeStream.topology; @@ -506,46 +492,27 @@ function processNewChange(args) { // close internal cursor, ignore errors changeStream.cursor.close(); - // attempt recreating the cursor - if (eventEmitter) { - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) { + waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { + if (err) { + // if there's an error reconnecting, close the change stream + changeStream.closed = true; + if (eventEmitter) { changeStream.emit('error', err); changeStream.emit('close'); return; } - changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - }); + return callback(err); + } - return; - } - - if (callback) { - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) return callback(err, null); - - changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - changeStream.next(callback); - }); - - return; - } - - return new Promise((resolve, reject) => { - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) return reject(err); - resolve(); - }); - }) - .then( - () => (changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions)) - ) - .then(() => changeStream.next()); + changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); + if (eventEmitter) return; + changeStream.next(callback); + }); + return; } if (eventEmitter) return changeStream.emit('error', error); - if (typeof callback === 'function') return callback(error, null); - return changeStream.promiseLibrary.reject(error); + return callback(error); } changeStream.attemptingResume = false; @@ -556,8 +523,7 @@ function processNewChange(args) { ); if (eventEmitter) return changeStream.emit('error', noResumeTokenError); - if (typeof callback === 'function') return callback(noResumeTokenError, null); - return changeStream.promiseLibrary.reject(noResumeTokenError); + return callback(noResumeTokenError); } // cache the resume token @@ -569,8 +535,7 @@ function processNewChange(args) { // Return the change if (eventEmitter) return changeStream.emit('change', change); - if (typeof callback === 'function') return callback(error, change); - return changeStream.promiseLibrary.resolve(change); + return callback(error, change); } /** diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 54df7c5ec9..020de3d6c4 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -4,6 +4,7 @@ var Transform = require('stream').Transform; const MongoError = require('../../lib/core').MongoError; var MongoNetworkError = require('../../lib/core').MongoNetworkError; var setupDatabase = require('./shared').setupDatabase; +var withTempDb = require('./shared').withTempDb; var delay = require('./shared').delay; var co = require('co'); var mock = require('mongodb-mock-server'); @@ -37,7 +38,11 @@ function triggerResumableError(changeStream, onCursorClosed) { * @param {function} callback */ function waitForStarted(changeStream, callback) { + const timeout = setTimeout(() => { + throw new Error('Change stream never started'); + }, 2000); changeStream.cursor.once('init', () => { + clearTimeout(timeout); callback(); }); } @@ -54,7 +59,6 @@ function tryNext(changeStream, callback) { let complete = false; function done(err, result) { if (complete) return; - // if the arity is 1 then this a callback for `more` if (arguments.length === 1) { result = err; @@ -158,7 +162,7 @@ describe('Change Streams', function() { close(err); }); }); - setTimeout(() => coll.insertOne({ x: 1 })); + waitForStarted(changeStream, () => coll.insertOne({ x: 1 })); changeStream.on('error', err => close(err)); }); } @@ -177,17 +181,6 @@ describe('Change Streams', function() { const collection = client.db('integration_tests').collection('docsDataEvent'); const changeStream = collection.watch(pipeline); - changeStream.cursor.once('response', () => { - // Trigger the first database event - collection.insertOne({ d: 4 }, function(err) { - assert.ifError(err); - // Trigger the second database event - collection.updateOne({ d: 4 }, { $inc: { d: 2 } }, function(err) { - assert.ifError(err); - }); - }); - }); - let count = 0; const cleanup = _err => { @@ -224,6 +217,17 @@ describe('Change Streams', function() { cleanup(e); } }); + + waitForStarted(changeStream, () => { + // Trigger the first database event + collection.insertOne({ d: 4 }, function(err) { + assert.ifError(err); + // Trigger the second database event + collection.updateOne({ d: 4 }, { $inc: { d: 2 } }, function(err) { + assert.ifError(err); + }); + }); + }); }); } }); @@ -417,15 +421,19 @@ describe('Change Streams', function() { client.connect(function(err, client) { assert.ifError(err); + const forbiddenStage = {}; + const forbiddenStageName = '$alksdjfhlaskdfjh'; + forbiddenStage[forbiddenStageName] = 2; + var theDatabase = client.db('integration_tests'); - var changeStream = theDatabase - .collection('forbiddenStageTest') - .watch([{ $alksdjfhlaskdfjh: 2 }]); + var changeStream = theDatabase.collection('forbiddenStageTest').watch([forbiddenStage]); changeStream.next(function(err) { assert.ok(err); assert.ok(err.message); - // assert.ok(err.message.indexOf('SOME ERROR MESSAGE HERE ONCE SERVER-29137 IS DONE') > -1); + assert.ok( + err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1 + ); changeStream.close(err => client.close(cerr => done(err || cerr))); }); }); @@ -448,7 +456,7 @@ describe('Change Streams', function() { var thisChangeStream = theDatabase.collection('cacheResumeTokenCallback').watch(pipeline); // Trigger the first database event - setTimeout(() => { + waitForStarted(thisChangeStream, () => { theDatabase .collection('cacheResumeTokenCallback') .insert({ b: 2 }, function(err, result) { @@ -484,7 +492,7 @@ describe('Change Streams', function() { var theDatabase = client.db('integration_tests'); var thisChangeStream = theDatabase.collection('cacheResumeTokenPromise').watch(pipeline); - setTimeout(() => { + waitForStarted(thisChangeStream, () => { // Trigger the first database event theDatabase.collection('cacheResumeTokenPromise').insert({ b: 2 }, function(err, result) { assert.ifError(err); @@ -530,7 +538,7 @@ describe('Change Streams', function() { thisChangeStream.close().then(() => client.close(done)); }); - setTimeout(() => { + waitForStarted(thisChangeStream, () => { // Trigger the first database event theDatabase .collection('cacheResumeTokenListener') @@ -562,21 +570,21 @@ describe('Change Streams', function() { .watch([{ $project: { _id: false } }]); // Trigger the first database event - setTimeout(() => { + waitForStarted(thisChangeStream, () => { theDatabase .collection('resumetokenProjectedOutCallback') .insert({ b: 2 }, function(err, result) { expect(err).to.not.exist; expect(result.insertedCount).to.equal(1); }); - }, 250); + }); // Fetch the change notification thisChangeStream.next(function(err) { expect(err).to.exist; // Close the change stream - thisChangeStream.close().then(() => client.close(done)); + thisChangeStream.close(() => client.close(done)); }); }); } @@ -610,7 +618,7 @@ describe('Change Streams', function() { }); // Trigger the first database event - setTimeout(() => { + waitForStarted(thisChangeStream, () => { theDatabase .collection('resumetokenProjectedOutListener') .insert({ b: 2 }, function(err, result) { @@ -669,7 +677,7 @@ describe('Change Streams', function() { }); // Trigger the first database event - setTimeout(() => { + waitForStarted(changeStream, () => { database.collection('invalidateListeners').insert({ a: 1 }, function(err) { assert.ifError(err); }); @@ -693,7 +701,7 @@ describe('Change Streams', function() { var changeStream = database.collection('invalidateCallback').watch(pipeline); // Trigger the first database event - setTimeout(() => { + waitForStarted(changeStream, () => { database.collection('invalidateCallback').insert({ a: 1 }, function(err) { assert.ifError(err); }); @@ -707,6 +715,7 @@ describe('Change Streams', function() { function completeStream() { changeStream.hasNext(function(err, hasNext) { + expect(err).to.not.exist; assert.equal(hasNext, false); assert.equal(changeStream.isClosed(), true); client.close(done); @@ -757,14 +766,14 @@ describe('Change Streams', function() { var changeStream = database.collection('invalidateCollectionDropPromises').watch(pipeline); // Trigger the first database event - setTimeout(() => { + waitForStarted(changeStream, () => { return database .collection('invalidateCollectionDropPromises') .insert({ a: 1 }) .then(function() { return delay(200); }); - }, 200); + }); return changeStream .next() @@ -1138,7 +1147,7 @@ describe('Change Streams', function() { // Trigger the first database event firstChangeStream = collection.watch(pipeline); - setTimeout(() => { + waitForStarted(firstChangeStream, () => { return collection .insert(docs[0]) .then(function(result) { @@ -1223,7 +1232,7 @@ describe('Change Streams', function() { fullDocument: 'updateLookup' }); - setTimeout(() => { + waitForStarted(changeStream, () => { return collection.insert({ f: 128 }).then(function(result) { assert.equal(result.insertedCount, 1); }); @@ -1279,7 +1288,7 @@ describe('Change Streams', function() { }); // Trigger the first database event - setTimeout(() => { + waitForStarted(changeStream, () => { return collection .insert({ i: 128 }) .then(function(result) { @@ -1416,7 +1425,7 @@ describe('Change Streams', function() { }) .on('error', close); - setTimeout(() => { + waitForStarted(thisChangeStream, () => { theCollection.insert({ a: 1 }, function(err) { assert.ifError(err); }); @@ -1631,7 +1640,7 @@ describe('Change Streams', function() { done(err); }); - setTimeout(() => { + waitForStarted(thisChangeStream, () => { theCollection.insert({ a: 1407 }, function(err) { if (err) done(err); }); @@ -1960,6 +1969,8 @@ describe('Change Streams', function() { expect(changeDoc).to.be.null; }); + changeStream.on('close', closeSpy); + changeStream.on('error', err => { expect(err).to.exist; changeStream.close(() => { @@ -1968,10 +1979,8 @@ describe('Change Streams', function() { }); }); - changeStream.on('close', closeSpy); - // Trigger the first database event - setTimeout(() => { + waitForStarted(changeStream, () => { coll.insertOne({ a: 1 }, (err, result) => { expect(err).to.not.exist; expect(result.insertedCount).to.equal(1); @@ -1981,15 +1990,17 @@ describe('Change Streams', function() { } }); - // TODO: re-enable/fix these tests in NODE-2548 - describe.skip('should properly handle a changeStream event being processed mid-close', function() { - let client, coll; + describe('should properly handle a changeStream event being processed mid-close', function() { + let client, coll, changeStream; function write() { return Promise.resolve() .then(() => coll.insertOne({ a: 1 })) - .then(() => coll.insertOne({ b: 2 })) - .then(() => coll.insertOne({ c: 3 })); + .then(() => coll.insertOne({ b: 2 })); + } + + function lastWrite() { + return coll.insertOne({ c: 3 }); } beforeEach(function() { @@ -1997,27 +2008,38 @@ describe('Change Streams', function() { return client.connect().then(_client => { client = _client; coll = client.db(this.configuration.db).collection('tester'); + changeStream = coll.watch(); }); }); afterEach(function() { - coll = undefined; - if (client) { - return client.close().then(() => { + return Promise.resolve() + .then(() => { + if (changeStream && !changeStream.isClosed()) { + return changeStream.close(); + } + }) + .then(() => { + if (client) { + return client.close(); + } + }) + .then(() => { + coll = undefined; + changeStream = undefined; client = undefined; }); - } }); it('when invoked with promises', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function() { function read() { - const changeStream = coll.watch(); return Promise.resolve() .then(() => changeStream.next()) .then(() => changeStream.next()) .then(() => { + lastWrite(); const nextP = changeStream.next(); return changeStream.close().then(() => nextP); @@ -2034,14 +2056,15 @@ describe('Change Streams', function() { it('when invoked with callbacks', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { - const changeStream = coll.watch(); - changeStream.next(() => { changeStream.next(() => { + lastWrite(); changeStream.next(err => { let _err = null; try { - expect(err.message).to.equal('ChangeStream is closed'); + expect(err) + .property('message') + .to.equal('ChangeStream is closed'); } catch (e) { _err = e; } finally { @@ -2068,8 +2091,6 @@ describe('Change Streams', function() { return done(_err); }; - const changeStream = coll.watch(); - let counter = 0; changeStream.on('change', () => { counter += 1; @@ -2082,7 +2103,11 @@ describe('Change Streams', function() { }); changeStream.on('error', err => close(err)); - setTimeout(() => write().catch(() => {})); + waitForStarted(changeStream, () => + write() + .then(() => lastWrite()) + .catch(() => {}) + ); } }); }); @@ -2609,48 +2634,43 @@ describe('Change Streams', function() { describe('tryNext', function() { it('should return null on single iteration of empty cursor', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function(done) { - const client = this.configuration.newClient(); - client.connect(err => { - expect(err).to.not.exist; - - const changeStream = client - .db() - .collection('test') - .watch(); - - tryNext(changeStream, (err, doc) => { - expect(err).to.not.exist; - expect(doc).to.not.exist; + test: function() { + return withTempDb( + 'testTryNext', + { w: 'majority' }, + this.configuration.newClient(), + db => done => { + const changeStream = db.collection('test').watch(); + tryNext(changeStream, (err, doc) => { + expect(err).to.not.exist; + expect(doc).to.not.exist; - changeStream.close(() => client.close(done)); - }); - }); + changeStream.close(done); + }); + } + ); } }); it('should iterate a change stream until first empty batch', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function(done) { - const client = this.configuration.newClient(); - client.connect(err => { - expect(err).to.not.exist; - - const collection = client.db().collection('test'); - const changeStream = collection.watch(); - waitForStarted(changeStream, () => { - collection.insertOne({ a: 42 }, err => { - expect(err).to.not.exist; - - collection.insertOne({ b: 24 }, err => { + test: function() { + return withTempDb( + 'testTryNext', + { w: 'majority' }, + this.configuration.newClient(), + db => done => { + const collection = db.collection('test'); + const changeStream = collection.watch(); + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; + + collection.insertOne({ b: 24 }, err => { + expect(err).to.not.exist; + }); }); }); - }); - - tryNext(changeStream, (err, doc) => { - expect(err).to.not.exist; - expect(doc).to.exist; tryNext(changeStream, (err, doc) => { expect(err).to.not.exist; @@ -2658,13 +2678,18 @@ describe('Change Streams', function() { tryNext(changeStream, (err, doc) => { expect(err).to.not.exist; - expect(doc).to.not.exist; + expect(doc).to.exist; - changeStream.close(() => client.close(done)); + tryNext(changeStream, (err, doc) => { + expect(err).to.not.exist; + expect(doc).to.not.exist; + + changeStream.close(done); + }); }); }); - }); - }); + } + ); } }); }); diff --git a/test/functional/shared.js b/test/functional/shared.js index 2a68d1b89c..106f234742 100644 --- a/test/functional/shared.js +++ b/test/functional/shared.js @@ -75,6 +75,17 @@ function makeCleanupFn(client) { }; } +function withTempDb(name, options, client, operation, errorHandler) { + return withClient( + client, + client => done => { + const db = client.db(name, options); + operation.call(this, db)(() => db.dropDatabase(done)); + }, + errorHandler + ); +} + function withClient(client, operation, errorHandler) { const cleanup = makeCleanupFn(client); @@ -198,6 +209,7 @@ module.exports = { assert, delay, withClient, + withTempDb, filterForCommands, filterOutCommands, ignoreNsNotFound,