Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(changeStream): properly handle changeStream event mid-close #1902

Merged
merged 2 commits into from Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions lib/change_stream.js
Expand Up @@ -368,6 +368,15 @@ function processNewChange(args) {
const change = args.change;
const callback = args.callback;
const eventEmitter = args.eventEmitter || false;

if (changeStream.isClosed()) {
if (eventEmitter) return;

const error = new Error('ChangeStream is closed');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be a MongoError right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also nit, but should ChangeStream just be change stream? It makes it look like a product name here 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was referring to the class ChangeStream here. Figured it was better to specify the constructor object.

Also, I specifically resisted throwing a MongoError b/c right now we use MongoError mostly for server-side errors. We don't really have a concept of a MongoDriverError.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 on point one
For the second point, I still think it makes sense to mark it as a MongoError until such an error class exists. It makes it harder to catch errors generically limited to the driver if we also sometimes throw plain Error s, at very least even if/when a MongoDriverError is introduced it will be a subclass of MongoError

if (typeof callback === 'function') return callback(error, null);
return changeStream.promiseLibrary.reject(error);
}

const topology = changeStream.topology;
const options = changeStream.cursor.options;

Expand Down
129 changes: 129 additions & 0 deletions test/functional/change_stream_tests.js
Expand Up @@ -1677,4 +1677,133 @@ describe('Change Streams', function() {
.then(() => finish(), err => finish(err));
}
});

describe('should properly handle a changeStream event being processed mid-close', function() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to refactor these three tests to use the same setup/teardown code for a client, rather than clutter the tests with it? It also feels like you can use the same method to write data, as you're really only testing different APIs for reads here

it('when invoked with promises', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
test: function() {
function read(coll) {
const changeStream = coll.watch();
return Promise.resolve()
.then(() => changeStream.next())
.then(() => changeStream.next())
.then(() => {
const closeP = Promise.resolve().then(() => changeStream.close());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, does changeStream.close() not return a Promise? Looks like it should to me. This should at least be: Promise.resolve(changeStream.close())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a relic of when I was trying to enqueue changeStream.close as a microTask. I can remove it.

const nextP = changeStream.next();

return closeP.then(() => nextP);
});
}

function write(coll) {
return Promise.resolve()
.then(() => coll.insertOne({ a: 1 }))
.then(() => coll.insertOne({ b: 2 }))
.then(() => coll.insertOne({ c: 3 }));
}

const client = this.configuration.newClient();

return client.connect().then(() => {
const coll = client.db(this.configuration.db).collection('tester');

return Promise.all([read(coll), write(coll)])
.then(
() => Promise.reject(new Error('Expected operation to fail with error')),
err => expect(err.message).to.equal('ChangeStream is closed')
)
.then(() => client.close(), err => client.close().then(() => Promise.reject(err)));
});
}
});

it('when invoked with callbacks', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
test: function(done) {
const client = this.configuration.newClient();
let closed = false;
const close = err => {
if (closed) {
return;
}
closed = true;
return client.close(() => done(err));
};
client.connect(err => {
if (err) {
return close(err);
}

const coll = client.db(this.configuration.db).collection('tester');
const changeStream = coll.watch();

changeStream.next(() => {
changeStream.next(() => {
changeStream.next(err => {
let _err = null;
try {
expect(err.message).to.equal('ChangeStream is closed');
} catch (e) {
_err = e;
} finally {
close(_err);
}
});
changeStream.close();
});
});

coll.insertOne({ a: 1 }, () => {
coll.insertOne({ b: 2 }, () => {
coll.insertOne({ c: 3 }, () => {});
});
});
});
}
});

it('when invoked using eventEmitter API', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
test: function(done) {
const client = this.configuration.newClient();
let closed = false;
const close = _err => {
if (closed) {
return;
}
closed = true;
return client.close(() => done(_err));
};

client.connect(err => {
if (err) {
return close(err);
}

const coll = client.db(this.configuration.db).collection('tester');
const changeStream = coll.watch();

let counter = 0;
changeStream.on('change', () => {
counter += 1;
if (counter === 2) {
changeStream.close();
setTimeout(() => close());
} else if (counter >= 3) {
close(new Error('Should not have received more than 2 events'));
}
});
changeStream.on('error', err => close(err));

setTimeout(() => {
coll.insertOne({ a: 1 }, () => {
coll.insertOne({ b: 2 }, () => {
coll.insertOne({ c: 3 }, () => {});
});
});
});
});
}
});
});
});