Skip to content

Commit

Permalink
refactor: keep waitqueue members in queue until connection ready
Browse files Browse the repository at this point in the history
  • Loading branch information
dariakp committed Aug 31, 2022
1 parent 88ef8a5 commit 0206e84
Showing 1 changed file with 29 additions and 10 deletions.
39 changes: 29 additions & 10 deletions src/cmap/connection_pool.ts
Expand Up @@ -66,6 +66,8 @@ const kMetrics = Symbol('metrics');
const kProcessingWaitQueue = Symbol('processingWaitQueue');
/** @internal */
const kPoolState = Symbol('poolState');
/** @internal */
const kWaitQueuePending = Symbol('waitQueuePending');

/** @public */
export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'generation'> {
Expand Down Expand Up @@ -154,6 +156,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
[kMetrics]: ConnectionPoolMetrics;
/** @internal */
[kProcessingWaitQueue]: boolean;
/** @internal */
[kWaitQueuePending]: number;

/**
* Emitted when the connection pool is created.
Expand Down Expand Up @@ -245,6 +249,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this[kCancellationToken] = new CancellationToken();
this[kCancellationToken].setMaxListeners(Infinity);
this[kWaitQueue] = new Denque();
this[kWaitQueuePending] = 0;
this[kMetrics] = new ConnectionPoolMetrics();
this[kProcessingWaitQueue] = false;

Expand Down Expand Up @@ -396,6 +401,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
if (this.closed) {
return;
}
this[kWaitQueuePending] = 0;

// handle load balanced case
if (this.loadBalanced && serviceId) {
Expand Down Expand Up @@ -453,6 +459,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

this[kPoolState] = PoolState.closed;
this[kWaitQueuePending] = 0;
this.clearMinPoolSizeTimer();
this.processWaitQueue();

Expand Down Expand Up @@ -591,8 +598,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return;
}

// The pool might have closed since we started trying to create a connection
if (this.closed) {
// The pool might have been cleared or closed since we started trying to create a connection
if (this[kPoolState] !== PoolState.ready) {
this[kPending]--;
connection.destroy({ force: true });
return;
Expand Down Expand Up @@ -722,17 +729,29 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

const { maxPoolSize, maxConnecting } = this.options;
while (
this.waitQueueSize > 0 &&
this.pendingConnectionCount < maxConnecting &&
(maxPoolSize === 0 || this.totalConnectionCount < maxPoolSize)
for (
let waitQueueIndex = this[kWaitQueuePending];
waitQueueIndex < this.waitQueueSize;
waitQueueIndex++
) {
const waitQueueMember = this[kWaitQueue].shift();
if (!waitQueueMember || waitQueueMember[kCancelled]) {
continue;
if (
this.pendingConnectionCount >= maxConnecting ||
(maxPoolSize > 0 && this.totalConnectionCount >= maxPoolSize)
) {
break;
}
this[kWaitQueuePending]++;
this.createConnection((err, connection) => {
if (waitQueueMember[kCancelled]) {
// The > 0 guard is just a precaution against future refactors
// Currently, the callback is invoked sync from createConnection
// so we are guaranteed the pool is still ready at this point
// however, if this changes to async, then it will be possible for the
// queue to get cleared before we get here
if (this[kWaitQueuePending] > 0) {
this[kWaitQueuePending]--;
}
const waitQueueMember = this[kWaitQueue].shift();
if (!waitQueueMember || waitQueueMember[kCancelled]) {
if (!err && connection) {
this[kConnections].push(connection);
}
Expand Down

0 comments on commit 0206e84

Please sign in to comment.