diff --git a/lib/change_stream.js b/lib/change_stream.js index 1d2bbe69ea..21f2428542 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -368,6 +368,20 @@ function processNewChange(args) { const change = args.change; const callback = args.callback; const eventEmitter = args.eventEmitter || false; + + // If the changeStream is closed, then it should not process a change. + if (changeStream.isClosed()) { + // We do not error in the eventEmitter case. + if (eventEmitter) { + return; + } + + const error = new MongoError('ChangeStream is closed'); + return typeof callback === 'function' + ? callback(error, null) + : changeStream.promiseLibrary.reject(error); + } + const topology = changeStream.topology; const options = changeStream.cursor.options; diff --git a/test/functional/change_stream_tests.js b/test/functional/change_stream_tests.js index b3dc38b952..114bc0a8b8 100644 --- a/test/functional/change_stream_tests.js +++ b/test/functional/change_stream_tests.js @@ -1677,4 +1677,109 @@ describe('Change Streams', function() { .then(() => finish(), err => finish(err)); } }); + + describe('should properly handle a changeStream event being processed mid-close', function() { + let client, coll; + + function write() { + return Promise.resolve() + .then(() => coll.insertOne({ a: 1 })) + .then(() => coll.insertOne({ b: 2 })) + .then(() => coll.insertOne({ c: 3 })); + } + + beforeEach(function() { + client = this.configuration.newClient(); + return client.connect().then(_client => { + client = _client; + coll = client.db(this.configuration.db).collection('tester'); + }); + }); + + afterEach(function() { + coll = undefined; + if (client) { + return client.close().then(() => { + client = undefined; + }); + } + }); + + it('when invoked with promises', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } }, + test: function() { + function read() { + const changeStream = coll.watch(); + return Promise.resolve() + .then(() => changeStream.next()) + .then(() => changeStream.next()) + .then(() => { + const nextP = changeStream.next(); + + return changeStream.close().then(() => nextP); + }); + } + + return Promise.all([read(), write()]).then( + () => Promise.reject(new Error('Expected operation to fail with error')), + err => expect(err.message).to.equal('ChangeStream is closed') + ); + } + }); + + it('when invoked with callbacks', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } }, + test: function(done) { + const changeStream = coll.watch(); + + changeStream.next(() => { + changeStream.next(() => { + changeStream.next(err => { + let _err = null; + try { + expect(err.message).to.equal('ChangeStream is closed'); + } catch (e) { + _err = e; + } finally { + done(_err); + } + }); + changeStream.close(); + }); + }); + + write().catch(() => {}); + } + }); + + it('when invoked using eventEmitter API', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } }, + test: function(done) { + let closed = false; + const close = _err => { + if (closed) { + return; + } + closed = true; + return done(_err); + }; + + const changeStream = coll.watch(); + + let counter = 0; + changeStream.on('change', () => { + counter += 1; + if (counter === 2) { + changeStream.close(); + setTimeout(() => close()); + } else if (counter >= 3) { + close(new Error('Should not have received more than 2 events')); + } + }); + changeStream.on('error', err => close(err)); + + setTimeout(() => write().catch(() => {})); + } + }); + }); });