Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pool adds gracefulExit to end gracefully #1810

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -2,3 +2,4 @@
coverage/
node_modules/
npm-debug.log
.DS_Store
dougwilson marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions Readme.md
Expand Up @@ -392,6 +392,7 @@ constructor. In addition to those options pools accept a few extras:
* `queueLimit`: The maximum number of connection requests the pool will queue
before returning an error from `getConnection`. If set to `0`, there is no
limit to the number of queued connection requests. (Default: `0`)
* `gracefulExit`: Determines whether to end gracefully. If `true`, every `pool.getConnection` or `pool.query` called before `pool.end` will success. If `false`, only commands / queries already in progress will complete, others will throw an error.
dougwilson marked this conversation as resolved.
Show resolved Hide resolved

## Pool events

Expand Down Expand Up @@ -462,6 +463,11 @@ all the connections have ended.
**Once `pool.end()` has been called, `pool.getConnection` and other operations
can no longer be performed**

If `gracefulExit` is set to `true`, the connections end _gracefully_, so all
-pending queries will still complete and the time to end the pool will vary.

The default `gracefulExit` is `false`, the following behavior will take effect.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the exact details of how the graceful end works here would be useful, as I remember that was the initial confusion here. As far as I can tell, this implementation will still let checked out connections queue new queries if there is at least one connection in the middle of being established, since the QUIT commands are not queued on all the connections until there is no connection in the aquiring state, is that intentional?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's intentional. I think graceful exit means finishing all queries called before end, so I make that happen.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but my comment was that your code is still sometimes allowing queries to be called after end. Is the call to end a hard stop, or should new queries still be allowed to get scheduled? The other issue is that under the acquiring condition, it will sometimes then stop the checked out connections from continuing to query and sometimes not.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but my comment was that your code is still sometimes allowing queries to be called after end. Is the call to end a hard stop, or should new queries still be allowed to get scheduled?

Only queries have been called before end would be allowed to execute, new queries after end would not be allowed no matter whether there is any checked out connection in the pool

The other issue is that under the acquiring condition, it will sometimes then stop the checked out connections from continuing to query and sometimes not.

When will the acquiring condition will happen?


This works by calling `connection.end()` on every active connection in the
pool, which queues a `QUIT` packet on the connection. And sets a flag to
prevent `pool.getConnection` from continuing to create any new connections.
Expand Down
45 changes: 35 additions & 10 deletions lib/Pool.js
Expand Up @@ -17,9 +17,10 @@ function Pool(options) {
this._freeConnections = [];
this._connectionQueue = [];
this._closed = false;
this._pendingClosing = false;
}

Pool.prototype.getConnection = function (cb) {
Pool.prototype.getConnection = function (cb, queued) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation for the new queued argument.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This queued argument is a private argument, is it appropriate to add documentation to README.md?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no such thing as a private argument -- the user can simply pass in the value. Just not documenting it doesn't make it private. If the argument is there, it needs to be documented.


if (this._closed) {
var err = new Error('Pool is closed.');
Expand All @@ -33,12 +34,21 @@ Pool.prototype.getConnection = function (cb) {
var connection;
var pool = this;

if (this._freeConnections.length > 0) {
if (this._freeConnections.length > 0 && (!this._pendingClosing || queued)) {
connection = this._freeConnections.shift();
this.acquireConnection(connection, cb);
return;
}

if (this._pendingClosing) {
var err = new Error('Pool is closed.');
err.code = 'POOL_CLOSED';
process.nextTick(function () {
cb(err);
});
return;
}

if (this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit) {
connection = new PoolConnection(this, { config: this.config.newConnectionConfig() });

Expand Down Expand Up @@ -141,6 +151,10 @@ Pool.prototype.releaseConnection = function releaseConnection(connection) {
this._freeConnections.push(connection);
this.emit('release', connection);
}

if (this._pendingClosing) {
this.end(this._endCallback);
}
}

if (this._closed) {
Expand All @@ -154,19 +168,18 @@ Pool.prototype.releaseConnection = function releaseConnection(connection) {
});
} else if (this._connectionQueue.length) {
// get connection with next waiting callback
this.getConnection(this._connectionQueue.shift());
this.getConnection(this._connectionQueue.shift(), true);
}
};

Pool.prototype.end = function (cb) {
this._closed = true;

if (typeof cb !== 'function') {
cb = function (err) {
if (err) throw err;
};
}

var readyToEnd = false;
var calledBack = false;
var waitingClose = 0;

Expand All @@ -177,14 +190,26 @@ Pool.prototype.end = function (cb) {
}
}

while (this._allConnections.length !== 0) {
waitingClose++;
this._purgeConnection(this._allConnections[0], onEnd);
if (this._acquiringConnections.length === 0 && this._connectionQueue.length === 0) {
readyToEnd = true;
}

if (waitingClose === 0) {
process.nextTick(onEnd);
if (!this.config.gracefulExit || readyToEnd) {
this._closed = true;

while (this._allConnections.length !== 0) {
waitingClose++;
this._purgeConnection(this._allConnections[0], onEnd);
}

if (waitingClose === 0) {
process.nextTick(onEnd);
}
return;
}

this._pendingClosing = true;
this._endCallback = cb;
dougwilson marked this conversation as resolved.
Show resolved Hide resolved
};

Pool.prototype.query = function (sql, values, cb) {
Expand Down
3 changes: 3 additions & 0 deletions lib/PoolConfig.js
Expand Up @@ -20,6 +20,9 @@ function PoolConfig(options) {
this.queueLimit = (options.queueLimit === undefined)
? 0
: Number(options.queueLimit);
this.gracefulExit = (options.gracefulExit === undefined)
? false
: Boolean(options.gracefulExit);
}

PoolConfig.prototype.newConnectionConfig = function newConnectionConfig() {
Expand Down
46 changes: 46 additions & 0 deletions test/unit/pool/test-graceful-exit-ping.js
@@ -0,0 +1,46 @@
var common = require('../../common');
var assert = require('assert');
var pool = common.createPool({
connectionLimit : 1,
port : common.fakeServerPort,
queueLimit : 5,
waitForConnections : true,
gracefulExit : true
});

var server = common.createFakeServer();

server.listen(common.fakeServerPort, function (err) {
assert.ifError(err);

pool.getConnection(function (err, conn) {
assert.ifError(err);
conn.release();

pool.getConnection(function (err, conn) {
assert.ifError(err);
conn.release();
});

pool.end(function (err) {
assert.ifError(err);
server.destroy();
});

pool.getConnection(function (err) {
assert.ok(err);
assert.equal(err.message, 'Pool is closed.');
assert.equal(err.code, 'POOL_CLOSED');
});
});
});

server.on('connection', function (conn) {
conn.handshake();
conn.on('ping', function () {
setTimeout(function () {
conn._sendPacket(new common.Packets.OkPacket());
conn._parser.resetPacketNumber();
}, 100);
});
});
37 changes: 37 additions & 0 deletions test/unit/pool/test-graceful-exit-queued.js
@@ -0,0 +1,37 @@
var common = require('../../common');
var assert = require('assert');
var pool = common.createPool({
connectionLimit : 1,
port : common.fakeServerPort,
queueLimit : 5,
waitForConnections : true,
gracefulExit : true
});

var server = common.createFakeServer();

server.listen(common.fakeServerPort, function (err) {
assert.ifError(err);

pool.getConnection(function (err, conn) {
assert.ifError(err);

pool.end(function (err) {
assert.ifError(err);
server.destroy();
});

pool.getConnection(function (err) {
assert.ok(err);
assert.equal(err.message, 'Pool is closed.');
assert.equal(err.code, 'POOL_CLOSED');
});

conn.release();
});

pool.getConnection(function (err, conn) {
assert.ifError(err);
conn.release();
});
});