From 0206e842eb3b0953ec941cbca8b9326fa13446e7 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 31 Aug 2022 14:51:15 -0400 Subject: [PATCH] refactor: keep waitqueue members in queue until connection ready --- src/cmap/connection_pool.ts | 39 +++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 4156c3fdb4..22e916ede0 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -66,6 +66,8 @@ const kMetrics = Symbol('metrics'); const kProcessingWaitQueue = Symbol('processingWaitQueue'); /** @internal */ const kPoolState = Symbol('poolState'); +/** @internal */ +const kWaitQueuePending = Symbol('waitQueuePending'); /** @public */ export interface ConnectionPoolOptions extends Omit { @@ -154,6 +156,8 @@ export class ConnectionPool extends TypedEventEmitter { [kMetrics]: ConnectionPoolMetrics; /** @internal */ [kProcessingWaitQueue]: boolean; + /** @internal */ + [kWaitQueuePending]: number; /** * Emitted when the connection pool is created. @@ -245,6 +249,7 @@ export class ConnectionPool extends TypedEventEmitter { this[kCancellationToken] = new CancellationToken(); this[kCancellationToken].setMaxListeners(Infinity); this[kWaitQueue] = new Denque(); + this[kWaitQueuePending] = 0; this[kMetrics] = new ConnectionPoolMetrics(); this[kProcessingWaitQueue] = false; @@ -396,6 +401,7 @@ export class ConnectionPool extends TypedEventEmitter { if (this.closed) { return; } + this[kWaitQueuePending] = 0; // handle load balanced case if (this.loadBalanced && serviceId) { @@ -453,6 +459,7 @@ export class ConnectionPool extends TypedEventEmitter { } this[kPoolState] = PoolState.closed; + this[kWaitQueuePending] = 0; this.clearMinPoolSizeTimer(); this.processWaitQueue(); @@ -591,8 +598,8 @@ export class ConnectionPool extends TypedEventEmitter { return; } - // The pool might have closed since we started trying to create a connection - if (this.closed) { + // The pool might have been cleared or closed since we started trying to create a connection + if (this[kPoolState] !== PoolState.ready) { this[kPending]--; connection.destroy({ force: true }); return; @@ -722,17 +729,29 @@ export class ConnectionPool extends TypedEventEmitter { } const { maxPoolSize, maxConnecting } = this.options; - while ( - this.waitQueueSize > 0 && - this.pendingConnectionCount < maxConnecting && - (maxPoolSize === 0 || this.totalConnectionCount < maxPoolSize) + for ( + let waitQueueIndex = this[kWaitQueuePending]; + waitQueueIndex < this.waitQueueSize; + waitQueueIndex++ ) { - const waitQueueMember = this[kWaitQueue].shift(); - if (!waitQueueMember || waitQueueMember[kCancelled]) { - continue; + if ( + this.pendingConnectionCount >= maxConnecting || + (maxPoolSize > 0 && this.totalConnectionCount >= maxPoolSize) + ) { + break; } + this[kWaitQueuePending]++; this.createConnection((err, connection) => { - if (waitQueueMember[kCancelled]) { + // The > 0 guard is just a precaution against future refactors + // Currently, the callback is invoked sync from createConnection + // so we are guaranteed the pool is still ready at this point + // however, if this changes to async, then it will be possible for the + // queue to get cleared before we get here + if (this[kWaitQueuePending] > 0) { + this[kWaitQueuePending]--; + } + const waitQueueMember = this[kWaitQueue].shift(); + if (!waitQueueMember || waitQueueMember[kCancelled]) { if (!err && connection) { this[kConnections].push(connection); }