Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(changeStream): properly handle changeStream event mid-close #1902

Merged
merged 2 commits into from Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/change_stream.js
Expand Up @@ -368,6 +368,20 @@ function processNewChange(args) {
const change = args.change;
const callback = args.callback;
const eventEmitter = args.eventEmitter || false;

// If the changeStream is closed, then it should not process a change.
if (changeStream.isClosed()) {
// We do not error in the eventEmitter case.
if (eventEmitter) {
return;
}

const error = new MongoError('ChangeStream is closed');
return typeof callback === 'function'
? callback(error, null)
: changeStream.promiseLibrary.reject(error);
}

const topology = changeStream.topology;
const options = changeStream.cursor.options;

Expand Down
105 changes: 105 additions & 0 deletions test/functional/change_stream_tests.js
Expand Up @@ -1677,4 +1677,109 @@ describe('Change Streams', function() {
.then(() => finish(), err => finish(err));
}
});

describe('should properly handle a changeStream event being processed mid-close', function() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to refactor these three tests to use the same setup/teardown code for a client, rather than clutter the tests with it? It also feels like you can use the same method to write data, as you're really only testing different APIs for reads here

let client, coll;

function write() {
return Promise.resolve()
.then(() => coll.insertOne({ a: 1 }))
.then(() => coll.insertOne({ b: 2 }))
.then(() => coll.insertOne({ c: 3 }));
}

beforeEach(function() {
client = this.configuration.newClient();
return client.connect().then(_client => {
client = _client;
coll = client.db(this.configuration.db).collection('tester');
});
});

afterEach(function() {
coll = undefined;
if (client) {
return client.close().then(() => {
client = undefined;
});
}
});

it('when invoked with promises', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
test: function() {
function read() {
const changeStream = coll.watch();
return Promise.resolve()
.then(() => changeStream.next())
.then(() => changeStream.next())
.then(() => {
const nextP = changeStream.next();

return changeStream.close().then(() => nextP);
});
}

return Promise.all([read(), write()]).then(
() => Promise.reject(new Error('Expected operation to fail with error')),
err => expect(err.message).to.equal('ChangeStream is closed')
);
}
});

it('when invoked with callbacks', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
test: function(done) {
const changeStream = coll.watch();

changeStream.next(() => {
changeStream.next(() => {
changeStream.next(err => {
let _err = null;
try {
expect(err.message).to.equal('ChangeStream is closed');
} catch (e) {
_err = e;
} finally {
done(_err);
}
});
changeStream.close();
});
});

write().catch(() => {});
}
});

it('when invoked using eventEmitter API', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
test: function(done) {
let closed = false;
const close = _err => {
if (closed) {
return;
}
closed = true;
return done(_err);
};

const changeStream = coll.watch();

let counter = 0;
changeStream.on('change', () => {
counter += 1;
if (counter === 2) {
changeStream.close();
setTimeout(() => close());
} else if (counter >= 3) {
close(new Error('Should not have received more than 2 events'));
}
});
changeStream.on('error', err => close(err));

setTimeout(() => write().catch(() => {}));
}
});
});
});