Skip to content

Commit

Permalink
Fix pool.getConnection race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
dougwilson committed Sep 22, 2014
1 parent eb58eef commit 17209da
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 65 deletions.
1 change: 1 addition & 0 deletions Changes.md
Expand Up @@ -7,6 +7,7 @@ you spot any mistakes.
## HEAD

* Fix `pool.end` race conditions #915
* Fix `pool.getConnection` race conditions

## v2.5.0 (2014-09-07)

Expand Down
122 changes: 59 additions & 63 deletions lib/Pool.js
Expand Up @@ -12,10 +12,11 @@ function Pool(options) {
this.config = options.config;
this.config.connectionConfig.pool = this;

this._allConnections = [];
this._freeConnections = [];
this._connectionQueue = [];
this._closed = false;
this._acquiringConnections = [];
this._allConnections = [];
this._freeConnections = [];
this._connectionQueue = [];
this._closed = false;
}

Pool.prototype.getConnection = function (cb) {
Expand All @@ -38,24 +39,22 @@ Pool.prototype.getConnection = function (cb) {
if (this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit) {
connection = new PoolConnection(this, { config: this.config.newConnectionConfig() });

this._acquiringConnections.push(connection);
this._allConnections.push(connection);

connection._pool = null;
return connection.connect({timeout: this.config.acquireTimeout}, function (err) {
return connection.connect({timeout: this.config.acquireTimeout}, function onConnect(err) {
spliceConnection(pool._acquiringConnections, connection);

if (pool._closed) {
connection.destroy();
pool._removeConnection(connection);
cb(new Error('Pool is closed.'));
return;
err = new Error('Pool is closed.');
}

if (err) {
pool._removeConnection(connection);
pool._purgeConnection(connection);
cb(err);
return;
}

connection._pool = pool;
pool.emit('connection', connection);
cb(null, connection);
});
Expand All @@ -77,30 +76,33 @@ Pool.prototype.acquireConnection = function acquireConnection(connection, cb) {

var pool = this;

connection._pool = null;
connection.ping({timeout: this.config.acquireTimeout}, function(err) {
if (!err && !pool._closed) {
connection._pool = pool;
cb(null, connection);
return;
}
this._acquiringConnections.push(connection);

connection.destroy();
connection.ping({timeout: this.config.acquireTimeout}, function onPing(err) {
spliceConnection(pool._acquiringConnections, connection);

if (pool._closed) {
pool._removeConnection(connection);
cb(new Error('Pool is closed.'));
err = new Error('Pool is closed.');
}

if (err) {
pool._connectionQueue.unshift(cb);
pool._purgeConnection(connection);
return;
}

pool._connectionQueue.unshift(cb);
pool._removeConnection(connection);
cb(null, connection);
});
};

Pool.prototype.releaseConnection = function releaseConnection(connection) {
var cb;

if (this._acquiringConnections.indexOf(connection) !== -1) {
// connection is being acquired
return;
}

if (connection._pool) {
if (connection._pool !== this) {
throw new Error('Connection released to wrong pool');
Expand Down Expand Up @@ -143,39 +145,26 @@ Pool.prototype.end = function (cb) {
};
}

var calledBack = false;
var closedConnections = 0;
var connection;
var calledBack = false;
var waitingClose = this._allConnections.length;

var endCB = function(err) {
function onEnd(err) {
if (calledBack) {
return;
}

if (err || this._allConnections.length === 0) {
if (err || --waitingClose === 0) {
calledBack = true;
return cb(err);
}
}.bind(this);

if (this._allConnections.length === 0) {
return process.nextTick(endCB);
}

while (this._allConnections.length) {
connection = this._allConnections[0];

if (connection._pool === this) {
closedConnections++;
connection._pool = null;
connection._realEnd(endCB);
}

this._removeConnection(connection);
if (waitingClose === 0) {
return process.nextTick(cb);
}

if (closedConnections === 0) {
return process.nextTick(endCB);
while (this._allConnections.length !== 0) {
this._purgeConnection(this._allConnections[0], onEnd);
}
};

Expand Down Expand Up @@ -229,32 +218,31 @@ Pool.prototype._enqueueCallback = function _enqueueCallback(callback) {
this.emit('enqueue');
};

Pool.prototype._purgeConnection = function _purgeConnection(connection) {
var pool = this;
Pool.prototype._purgeConnection = function _purgeConnection(connection, callback) {
var cb = callback || function () {};

connection._realEnd(function(err) {
if (err) {
connection.destroy();
}
if (connection.state === 'disconnected') {
connection.destroy();
}

pool._removeConnection(connection);
});
this._removeConnection(connection);

if (connection.state !== 'disconnected' && !connection._protocol._quitSequence) {
connection._realEnd(cb);
return;
}

process.nextTick(cb);
};

Pool.prototype._removeConnection = function(connection) {
var index;

connection._pool = null;

if ((index = this._allConnections.indexOf(connection)) !== -1) {
// Remove connection from all connections
this._allConnections.splice(index, 1);
}
// Remove connection from all connections
spliceConnection(this._allConnections, connection);

if ((index = this._freeConnections.indexOf(connection)) !== -1) {
// Remove connection from free connections
this._freeConnections.splice(index, 1);
}
// Remove connection from free connections
spliceConnection(this._freeConnections, connection);

this.releaseConnection(connection);
};
Expand All @@ -266,3 +254,11 @@ Pool.prototype.escape = function(value) {
Pool.prototype.escapeId = function escapeId(value) {
return mysql.escapeId(value, false);
};

function spliceConnection(array, connection) {
var index;
if ((index = array.indexOf(connection)) !== -1) {
// Remove connection from all connections
array.splice(index, 1);
}
}
4 changes: 2 additions & 2 deletions test/unit/pool/test-change-user-eject.js
Expand Up @@ -34,11 +34,11 @@ server.listen(common.fakeServerPort, function(err) {

pool.getConnection(function(err, conn1) {
assert.ifError(err);
assert.strictEqual(conn1.threadId, 1);
assert.strictEqual(conn1.threadId, 3);

pool.getConnection(function(err, conn2) {
assert.ifError(err);
assert.strictEqual(conn2.threadId, 3);
assert.strictEqual(conn2.threadId, 1);
conn1.release();
conn2.release();

Expand Down

0 comments on commit 17209da

Please sign in to comment.