diff --git a/Changes.md b/Changes.md index 04b7acd51..05f217031 100644 --- a/Changes.md +++ b/Changes.md @@ -7,6 +7,7 @@ you spot any mistakes. ## HEAD * Fix `pool.end` race conditions #915 +* Fix `pool.getConnection` race conditions ## v2.5.0 (2014-09-07) diff --git a/lib/Pool.js b/lib/Pool.js index 6d30e35e3..0e3000c76 100644 --- a/lib/Pool.js +++ b/lib/Pool.js @@ -12,10 +12,11 @@ function Pool(options) { this.config = options.config; this.config.connectionConfig.pool = this; - this._allConnections = []; - this._freeConnections = []; - this._connectionQueue = []; - this._closed = false; + this._acquiringConnections = []; + this._allConnections = []; + this._freeConnections = []; + this._connectionQueue = []; + this._closed = false; } Pool.prototype.getConnection = function (cb) { @@ -38,24 +39,22 @@ Pool.prototype.getConnection = function (cb) { if (this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit) { connection = new PoolConnection(this, { config: this.config.newConnectionConfig() }); + this._acquiringConnections.push(connection); this._allConnections.push(connection); - connection._pool = null; - return connection.connect({timeout: this.config.acquireTimeout}, function (err) { + return connection.connect({timeout: this.config.acquireTimeout}, function onConnect(err) { + spliceConnection(pool._acquiringConnections, connection); + if (pool._closed) { - connection.destroy(); - pool._removeConnection(connection); - cb(new Error('Pool is closed.')); - return; + err = new Error('Pool is closed.'); } if (err) { - pool._removeConnection(connection); + pool._purgeConnection(connection); cb(err); return; } - connection._pool = pool; pool.emit('connection', connection); cb(null, connection); }); @@ -77,30 +76,33 @@ Pool.prototype.acquireConnection = function acquireConnection(connection, cb) { var pool = this; - connection._pool = null; - connection.ping({timeout: this.config.acquireTimeout}, function(err) { - if (!err && !pool._closed) { - connection._pool = pool; - cb(null, connection); - return; - } + this._acquiringConnections.push(connection); - connection.destroy(); + connection.ping({timeout: this.config.acquireTimeout}, function onPing(err) { + spliceConnection(pool._acquiringConnections, connection); if (pool._closed) { - pool._removeConnection(connection); - cb(new Error('Pool is closed.')); + err = new Error('Pool is closed.'); + } + + if (err) { + pool._connectionQueue.unshift(cb); + pool._purgeConnection(connection); return; } - pool._connectionQueue.unshift(cb); - pool._removeConnection(connection); + cb(null, connection); }); }; Pool.prototype.releaseConnection = function releaseConnection(connection) { var cb; + if (this._acquiringConnections.indexOf(connection) !== -1) { + // connection is being acquired + return; + } + if (connection._pool) { if (connection._pool !== this) { throw new Error('Connection released to wrong pool'); @@ -143,39 +145,26 @@ Pool.prototype.end = function (cb) { }; } - var calledBack = false; - var closedConnections = 0; - var connection; + var calledBack = false; + var waitingClose = this._allConnections.length; - var endCB = function(err) { + function onEnd(err) { if (calledBack) { return; } - if (err || this._allConnections.length === 0) { + if (err || --waitingClose === 0) { calledBack = true; return cb(err); } - }.bind(this); - - if (this._allConnections.length === 0) { - return process.nextTick(endCB); } - while (this._allConnections.length) { - connection = this._allConnections[0]; - - if (connection._pool === this) { - closedConnections++; - connection._pool = null; - connection._realEnd(endCB); - } - - this._removeConnection(connection); + if (waitingClose === 0) { + return process.nextTick(cb); } - if (closedConnections === 0) { - return process.nextTick(endCB); + while (this._allConnections.length !== 0) { + this._purgeConnection(this._allConnections[0], onEnd); } }; @@ -229,32 +218,31 @@ Pool.prototype._enqueueCallback = function _enqueueCallback(callback) { this.emit('enqueue'); }; -Pool.prototype._purgeConnection = function _purgeConnection(connection) { - var pool = this; +Pool.prototype._purgeConnection = function _purgeConnection(connection, callback) { + var cb = callback || function () {}; - connection._realEnd(function(err) { - if (err) { - connection.destroy(); - } + if (connection.state === 'disconnected') { + connection.destroy(); + } - pool._removeConnection(connection); - }); + this._removeConnection(connection); + + if (connection.state !== 'disconnected' && !connection._protocol._quitSequence) { + connection._realEnd(cb); + return; + } + + process.nextTick(cb); }; Pool.prototype._removeConnection = function(connection) { - var index; - connection._pool = null; - if ((index = this._allConnections.indexOf(connection)) !== -1) { - // Remove connection from all connections - this._allConnections.splice(index, 1); - } + // Remove connection from all connections + spliceConnection(this._allConnections, connection); - if ((index = this._freeConnections.indexOf(connection)) !== -1) { - // Remove connection from free connections - this._freeConnections.splice(index, 1); - } + // Remove connection from free connections + spliceConnection(this._freeConnections, connection); this.releaseConnection(connection); }; @@ -266,3 +254,11 @@ Pool.prototype.escape = function(value) { Pool.prototype.escapeId = function escapeId(value) { return mysql.escapeId(value, false); }; + +function spliceConnection(array, connection) { + var index; + if ((index = array.indexOf(connection)) !== -1) { + // Remove connection from all connections + array.splice(index, 1); + } +} diff --git a/test/unit/pool/test-change-user-eject.js b/test/unit/pool/test-change-user-eject.js index 161e72591..325b7d1de 100644 --- a/test/unit/pool/test-change-user-eject.js +++ b/test/unit/pool/test-change-user-eject.js @@ -14,19 +14,23 @@ server.listen(common.fakeServerPort, function(err) { assert.ifError(err); var conn0; + var threadId; pool.getConnection(function(err, conn) { assert.ifError(err); - assert.strictEqual(conn.threadId, 1); + assert.ok(conn.threadId === 1 || conn.threadId === 2); conn0 = conn; + threadId = conn.threadId; }); pool.getConnection(function(err, conn) { assert.ifError(err); - assert.strictEqual(conn.threadId, 2); + assert.ok(conn.threadId === 1 || conn.threadId === 2); + + var threadId = conn.threadId; conn.changeUser({user: 'user_2'}, function(err) { assert.ifError(err); - assert.strictEqual(conn.threadId, 2); + assert.strictEqual(conn.threadId, threadId); conn.release(); conn0.release(); }); @@ -34,11 +38,11 @@ server.listen(common.fakeServerPort, function(err) { pool.getConnection(function(err, conn1) { assert.ifError(err); - assert.strictEqual(conn1.threadId, 1); + assert.strictEqual(conn1.threadId, 3); pool.getConnection(function(err, conn2) { assert.ifError(err); - assert.strictEqual(conn2.threadId, 3); + assert.strictEqual(conn2.threadId, threadId); conn1.release(); conn2.release();