Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ChangeStream): use maybePromise for close, improve tests #2343

Merged
merged 29 commits into from May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7d38aa6
refactor: use maybePromise for ChangeStream.close
emadum Apr 30, 2020
89a162f
test: fix skipped mid-close tests
emadum Apr 30, 2020
4dd3875
clean up processNewChange
emadum May 3, 2020
6a75b37
clean up processNewChange some more
emadum May 3, 2020
67e9294
remove comment
emadum May 3, 2020
35a552f
test: add waitForStarted to flaky changeStream tests
emadum May 3, 2020
279f3c4
cleanup from review
emadum May 3, 2020
182a03d
properly fix mid-close tests
emadum May 3, 2020
fd01b1c
use waitForStarted in flaky tests
emadum May 4, 2020
972a6ed
use waitForStarted where possible in existing tests
emadum May 4, 2020
b0a0fb8
improve cleanup logic for mid-close tests
emadum May 4, 2020
395a9bc
use callback so client will close even if changeStream.close errors
emadum May 4, 2020
3de4d83
fix typo
emadum May 4, 2020
5e1dc83
revert changes and skip mid-close tests
emadum May 4, 2020
ad22a9f
test fix
emadum May 5, 2020
25609ec
refactor: clean up tryNext tests, add withTempDb helper
emadum May 5, 2020
891544d
remove .only
emadum May 5, 2020
44ff554
fix: dont processNewChange for closed changeStream
emadum May 5, 2020
e488d63
fix broken mid-close tests, again
emadum May 5, 2020
e5b7802
fix todo in forbidden stage test
emadum May 5, 2020
c7c5bc5
cleanup
emadum May 5, 2020
b97befe
add timeout to waitForStarted
emadum May 5, 2020
70bdeba
ensure parity on mid-close events test
emadum May 5, 2020
8619634
spend less time waiting for topology
emadum May 6, 2020
29dedd2
tryNext tests should use w: majority
emadum May 6, 2020
0305e32
remove old code
emadum May 6, 2020
4cbd81c
dont skip waitForTopology on unified topology
emadum May 6, 2020
ffa0b38
lint error
emadum May 6, 2020
6e8d0d6
clean up commented code
emadum May 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
109 changes: 37 additions & 72 deletions lib/change_stream.js
Expand Up @@ -94,6 +94,8 @@ class ChangeStream extends EventEmitter {
// Create contained Change Stream cursor
this.cursor = createChangeStreamCursor(this, options);

this.closed = false;

// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', eventName => {
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
Expand Down Expand Up @@ -141,7 +143,7 @@ class ChangeStream extends EventEmitter {
next(callback) {
return maybePromise(this.parent, callback, cb => {
if (this.isClosed()) {
return cb(new Error('Change Stream is not open.'));
return cb(new MongoError('ChangeStream is closed'));
}
this.cursor.next((error, change) => {
processNewChange({ changeStream: this, error, change, callback: cb });
Expand All @@ -155,10 +157,7 @@ class ChangeStream extends EventEmitter {
* @return {boolean}
*/
isClosed() {
if (this.cursor) {
return this.cursor.isClosed();
}
return true;
return this.closed || (this.cursor && this.cursor.isClosed());
}

/**
Expand All @@ -168,31 +167,20 @@ class ChangeStream extends EventEmitter {
* @return {Promise} returns Promise if no callback passed
*/
close(callback) {
if (!this.cursor) {
if (callback) return callback();
return this.promiseLibrary.resolve();
}
return maybePromise(this.parent, callback, cb => {
if (this.closed) return cb();

// Tidy up the existing cursor
const cursor = this.cursor;
// flag the change stream as explicitly closed
this.closed = true;

if (callback) {
return cursor.close(err => {
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
delete this.cursor;
// Tidy up the existing cursor
const cursor = this.cursor;

return callback(err);
});
}

const PromiseCtor = this.promiseLibrary || Promise;
return new PromiseCtor((resolve, reject) => {
cursor.close(err => {
return cursor.close(err => {
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
delete this.cursor;
this.cursor = undefined;

if (err) return reject(err);
resolve();
return cb(err);
});
});
}
Expand Down Expand Up @@ -321,7 +309,7 @@ class ChangeStreamCursor extends Cursor {
_initializeCursor(callback) {
super._initializeCursor((err, result) => {
if (err) {
callback(err, null);
callback(err);
return;
}

Expand All @@ -347,7 +335,7 @@ class ChangeStreamCursor extends Cursor {
_getMore(callback) {
super._getMore((err, response) => {
if (err) {
callback(err, null);
callback(err);
return;
}

Expand Down Expand Up @@ -460,12 +448,12 @@ function waitForTopologyConnected(topology, options, callback) {
const timeout = options.timeout || SELECTION_TIMEOUT;
const readPreference = options.readPreference;

if (topology.isConnected({ readPreference })) return callback(null, null);
if (topology.isConnected({ readPreference })) return callback();
const hrElapsed = process.hrtime(start);
const elapsed = (hrElapsed[0] * 1e9 + hrElapsed[1]) / 1e6;
if (elapsed > timeout) return callback(new MongoError('Timed out waiting for connection'));
waitForTopologyConnected(topology, options, callback);
}, 3000); // this is an arbitrary wait time to allow SDAM to transition
}, 500); // this is an arbitrary wait time to allow SDAM to transition
}

// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
Expand All @@ -477,17 +465,15 @@ function processNewChange(args) {
const eventEmitter = args.eventEmitter || false;
const cursor = changeStream.cursor;

// If the cursor is null, then it should not process a change.
if (cursor == null) {
// If the cursor is null or the change stream has been closed explictly, do not process a change.
if (cursor == null || changeStream.closed) {
// We do not error in the eventEmitter case.
changeStream.closed = true;
if (eventEmitter) {
return;
}

const error = new MongoError('ChangeStream is closed');
return typeof callback === 'function'
? callback(error, null)
: changeStream.promiseLibrary.reject(error);
callback(new MongoError('ChangeStream is closed'));
return;
}

const topology = changeStream.topology;
Expand All @@ -506,46 +492,27 @@ function processNewChange(args) {
// close internal cursor, ignore errors
changeStream.cursor.close();

// attempt recreating the cursor
if (eventEmitter) {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) {
// if there's an error reconnecting, close the change stream
changeStream.closed = true;
if (eventEmitter) {
changeStream.emit('error', err);
changeStream.emit('close');
return;
}
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
});
return callback(err);
}

return;
}

if (callback) {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) return callback(err, null);

changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
changeStream.next(callback);
});

return;
}

return new Promise((resolve, reject) => {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) return reject(err);
resolve();
});
})
.then(
() => (changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions))
)
.then(() => changeStream.next());
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
if (eventEmitter) return;
changeStream.next(callback);
emadum marked this conversation as resolved.
Show resolved Hide resolved
});
return;
}

if (eventEmitter) return changeStream.emit('error', error);
if (typeof callback === 'function') return callback(error, null);
return changeStream.promiseLibrary.reject(error);
return callback(error);
}

changeStream.attemptingResume = false;
Expand All @@ -556,8 +523,7 @@ function processNewChange(args) {
);

if (eventEmitter) return changeStream.emit('error', noResumeTokenError);
if (typeof callback === 'function') return callback(noResumeTokenError, null);
return changeStream.promiseLibrary.reject(noResumeTokenError);
return callback(noResumeTokenError);
}

// cache the resume token
Expand All @@ -569,8 +535,7 @@ function processNewChange(args) {

// Return the change
if (eventEmitter) return changeStream.emit('change', change);
if (typeof callback === 'function') return callback(error, change);
return changeStream.promiseLibrary.resolve(change);
return callback(error, change);
}

/**
Expand Down