Skip to content

Commit

Permalink
fix: allow event loop to process during wait queue processing (#2537)
Browse files Browse the repository at this point in the history
* fix: allow event loop to process during wait queue processing

Running `processWaitQueue` on the next tick allows the event loop
to process while the connection pool is processing large numbers of
wait queue members. This also uncovered a few issues with timing
in our tests, and in some cases our top-level API:
  - `commitTransaction` / `abortTransaction` use `maybePromise` now
  - `endSession` must wait for all the machinery behind the scenes to
     check out a connection and write a message before considering its
     job finished
  - internal calls to `kill` a cursor now await the the process of fully
    sending that command, even if they ignore the response

NODE-2803
  • Loading branch information
mbroadst committed Sep 9, 2020
1 parent 2a6faa6 commit 4e03dfa
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 65 deletions.
14 changes: 4 additions & 10 deletions lib/cmap/connection_pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ class ConnectionPool extends EventEmitter {
return;
}

// add this request to the wait queue
const waitQueueMember = { callback };

const pool = this;
Expand All @@ -233,11 +232,8 @@ class ConnectionPool extends EventEmitter {
}, waitQueueTimeoutMS);
}

// place the member at the end of the wait queue
this[kWaitQueue].push(waitQueueMember);

// process the wait queue
processWaitQueue(this);
setImmediate(() => processWaitQueue(this));
}

/**
Expand All @@ -250,10 +246,8 @@ class ConnectionPool extends EventEmitter {
const stale = connectionIsStale(this, connection);
const willDestroy = !!(poolClosed || stale || connection.closed);

// Properly adjust state of connection
if (!willDestroy) {
connection.markAvailable();

this[kConnections].push(connection);
}

Expand All @@ -264,7 +258,7 @@ class ConnectionPool extends EventEmitter {
destroyConnection(this, connection, reason);
}

processWaitQueue(this);
setImmediate(() => processWaitQueue(this));
}

/**
Expand Down Expand Up @@ -434,7 +428,7 @@ function createConnection(pool, callback) {

// otherwise add it to the pool for later acquisition, and try to process the wait queue
pool[kConnections].push(connection);
processWaitQueue(pool);
setImmediate(() => processWaitQueue(pool));
});
}

Expand All @@ -445,7 +439,7 @@ function destroyConnection(pool, connection, reason) {
pool[kPermits]++;

// destroy the connection
process.nextTick(() => connection.destroy());
setImmediate(() => connection.destroy());
}

function processWaitQueue(pool) {
Expand Down
29 changes: 18 additions & 11 deletions lib/core/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -745,9 +745,10 @@ function nextFunction(self, callback) {

if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
// Ensure we kill the cursor on the server
self.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(self, callback);
self.kill(() =>
// Set cursor in dead and notified state
setCursorDeadAndNotified(self, callback)
);
} else if (
self.cursorState.cursorIndex === self.cursorState.documents.length &&
!Long.ZERO.equals(self.cursorState.cursorId)
Expand Down Expand Up @@ -827,9 +828,12 @@ function nextFunction(self, callback) {
} else {
if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
// Ensure we kill the cursor on the server
self.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(self, callback);
self.kill(() =>
// Set cursor in dead and notified state
setCursorDeadAndNotified(self, callback)
);

return;
}

// Increment the current cursor limit
Expand All @@ -841,11 +845,14 @@ function nextFunction(self, callback) {
// Doc overflow
if (!doc || doc.$err) {
// Ensure we kill the cursor on the server
self.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(self, function() {
handleCallback(callback, new MongoError(doc ? doc.$err : undefined));
});
self.kill(() =>
// Set cursor in dead and notified state
setCursorDeadAndNotified(self, function() {
handleCallback(callback, new MongoError(doc ? doc.$err : undefined));
})
);

return;
}

// Transform the doc with passed in transformation method if provided
Expand Down
64 changes: 29 additions & 35 deletions lib/core/sessions.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const Transaction = require('./transactions').Transaction;
const TxnState = require('./transactions').TxnState;
const isPromiseLike = require('./utils').isPromiseLike;
const ReadPreference = require('./topologies/read_preference');
const maybePromise = require('../utils').maybePromise;
const isTransactionCommand = require('./transactions').isTransactionCommand;
const resolveClusterTime = require('./topologies/shared').resolveClusterTime;
const isSharded = require('./wireprotocol/shared').isSharded;
Expand Down Expand Up @@ -125,25 +126,36 @@ class ClientSession extends EventEmitter {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

if (this.hasEnded) {
if (typeof callback === 'function') callback(null, null);
return;
}
const session = this;
return maybePromise(this, callback, done => {
if (session.hasEnded) {
return done();
}

if (this.serverSession && this.inTransaction()) {
this.abortTransaction(); // pass in callback?
}
function completeEndSession() {
// release the server session back to the pool
session.sessionPool.release(session.serverSession);
session[kServerSession] = undefined;

// release the server session back to the pool
this.sessionPool.release(this.serverSession);
this[kServerSession] = undefined;
// mark the session as ended, and emit a signal
session.hasEnded = true;
session.emit('ended', session);

// spec indicates that we should ignore all errors for `endSessions`
done();
}

if (session.serverSession && session.inTransaction()) {
session.abortTransaction(err => {
if (err) return done(err);
completeEndSession();
});

// mark the session as ended, and emit a signal
this.hasEnded = true;
this.emit('ended', this);
return;
}

// spec indicates that we should ignore all errors for `endSessions`
if (typeof callback === 'function') callback(null, null);
completeEndSession();
});
}

/**
Expand Down Expand Up @@ -227,16 +239,7 @@ class ClientSession extends EventEmitter {
* @return {Promise} A promise is returned if no callback is provided
*/
commitTransaction(callback) {
if (typeof callback === 'function') {
endTransaction(this, 'commitTransaction', callback);
return;
}

return new Promise((resolve, reject) => {
endTransaction(this, 'commitTransaction', (err, reply) =>
err ? reject(err) : resolve(reply)
);
});
return maybePromise(this, callback, done => endTransaction(this, 'commitTransaction', done));
}

/**
Expand All @@ -246,16 +249,7 @@ class ClientSession extends EventEmitter {
* @return {Promise} A promise is returned if no callback is provided
*/
abortTransaction(callback) {
if (typeof callback === 'function') {
endTransaction(this, 'abortTransaction', callback);
return;
}

return new Promise((resolve, reject) => {
endTransaction(this, 'abortTransaction', (err, reply) =>
err ? reject(err) : resolve(reply)
);
});
return maybePromise(this, callback, done => endTransaction(this, 'abortTransaction', done));
}

/**
Expand Down
11 changes: 6 additions & 5 deletions test/functional/spec-runner/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,12 @@ function runTestSuiteTest(configuration, spec, context) {
throw err;
})
.then(() => {
if (session0) session0.endSession();
if (session1) session1.endSession();

return validateExpectations(context.commandEvents, spec, savedSessionData);
});
const promises = [];
if (session0) promises.push(session0.endSession());
if (session1) promises.push(session1.endSession());
return Promise.all(promises);
})
.then(() => validateExpectations(context.commandEvents, spec, savedSessionData));
});
}

Expand Down
10 changes: 6 additions & 4 deletions test/unit/cmap/connection_pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,13 @@ describe('Connection Pool', function() {
sinon.stub(pool, 'availableConnectionCount').get(() => 0);
pool.checkIn(conn);

expect(pool)
.property('waitQueueSize')
.to.equal(0);
setImmediate(() => {
expect(pool)
.property('waitQueueSize')
.to.equal(0);

done();
done();
});
});
});
});
Expand Down

0 comments on commit 4e03dfa

Please sign in to comment.