diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index d7f7e70406e..6b6966e5a80 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -41,7 +41,12 @@ import { ConnectionPoolReadyEvent, ConnectionReadyEvent } from './connection_pool_events'; -import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors'; +import { + PoolClearedError, + PoolClearedOnNetworkError, + PoolClosedError, + WaitQueueTimeoutError +} from './errors'; import { ConnectionPoolMetrics } from './metrics'; /** @internal */ @@ -391,10 +396,10 @@ export class ConnectionPool extends TypedEventEmitter { this[kConnections].unshift(connection); } - this[kCheckedOut].delete(connection); + const wasConnectionDeleted = this[kCheckedOut].delete(connection); this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new ConnectionCheckedInEvent(this, connection)); - if (willDestroy) { + if (wasConnectionDeleted && willDestroy) { const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale'; this.destroyConnection(connection, reason); } @@ -408,8 +413,9 @@ export class ConnectionPool extends TypedEventEmitter { * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a * previous generation will eventually be pruned during subsequent checkouts. */ - clear(options: { serviceId?: ObjectId } = {}): void { + clear(options: { serviceId?: ObjectId; interruptInUseConnections?: boolean } = {}): void { const { serviceId } = options; + const interruptInUseConnections = options.interruptInUseConnections ?? false; if (this.closed) { return; } @@ -433,6 +439,8 @@ export class ConnectionPool extends TypedEventEmitter { return; } + const oldGeneration = this[kGeneration]; + // handle non load-balanced case this[kGeneration] += 1; const alreadyPaused = this[kPoolState] === PoolState.paused; @@ -440,11 +448,63 @@ export class ConnectionPool extends TypedEventEmitter { this.clearMinPoolSizeTimer(); if (!alreadyPaused) { - this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this)); + this.emit( + ConnectionPool.CONNECTION_POOL_CLEARED, + new ConnectionPoolClearedEvent(this, { interruptInUseConnections }) + ); } + + process.nextTick(() => + this.pruneConnections({ minGeneration: oldGeneration, interruptInUseConnections }) + ); + this.processWaitQueue(); } + /** + * Closes all checked in perished connections in the pool with a resumable PoolClearedOnNetworkError. + * + * If interruptInUseConnections is `true`, this method attempts to kill checked out connections as well. + * Only connections where `connection.generation <= minGeneration` are killed. Connections are closed with a + * resumable PoolClearedOnNetworkTimeoutError. + */ + private pruneConnections({ + interruptInUseConnections, + minGeneration + }: { + interruptInUseConnections: boolean; + minGeneration: number; + }) { + this[kConnections].prune(connection => { + if (connection.generation <= minGeneration) { + connection.onError(new PoolClearedOnNetworkError(this)); + this.emit( + ConnectionPool.CONNECTION_CLOSED, + new ConnectionClosedEvent(this, connection, 'stale') + ); + + return true; + } + return false; + }); + + if (interruptInUseConnections) { + for (const connection of this[kCheckedOut]) { + if (connection.generation <= minGeneration) { + this[kCheckedOut].delete(connection); + connection.onError(new PoolClearedOnNetworkError(this)); + this.emit( + ConnectionPool.CONNECTION_CLOSED, + new ConnectionClosedEvent(this, connection, 'stale') + ); + } + } + + // TODO(NODE-xxxx): track pending connections and cancel + // this[kCancellationToken].emit('cancel'); + } + } + /** Close the pool */ close(callback: Callback): void; close(options: CloseOptions, callback: Callback): void; @@ -573,7 +633,12 @@ export class ConnectionPool extends TypedEventEmitter { return !!(this.options.maxIdleTimeMS && connection.idleTime > this.options.maxIdleTimeMS); } - private connectionIsPerished(connection: Connection) { + /** + * Destroys a connection if the connection is perished. + * + * @returns `true` if the connection was destroyed, `false` otherwise. + */ + private destroyConnectionIfPerished(connection: Connection) { const isStale = this.connectionIsStale(connection); const isIdle = this.connectionIsIdle(connection); if (!isStale && !isIdle && !connection.closed) { @@ -659,7 +724,7 @@ export class ConnectionPool extends TypedEventEmitter { return; } - this[kConnections].prune(connection => this.connectionIsPerished(connection)); + this[kConnections].prune(connection => this.destroyConnectionIfPerished(connection)); if ( this.totalConnectionCount < minPoolSize && @@ -735,7 +800,7 @@ export class ConnectionPool extends TypedEventEmitter { break; } - if (!this.connectionIsPerished(connection)) { + if (!this.destroyConnectionIfPerished(connection)) { this[kCheckedOut].add(connection); this.emit( ConnectionPool.CONNECTION_CHECKED_OUT, diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index e6d8c9256af..002333401ee 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -1,4 +1,4 @@ -import { MongoDriverError, MongoNetworkError } from '../error'; +import { MongoDriverError, MongoErrorLabel, MongoNetworkError } from '../error'; import type { ConnectionPool } from './connection_pool'; /** @@ -49,6 +49,8 @@ export class PoolClearedOnNetworkError extends MongoNetworkError { constructor(pool: ConnectionPool) { super(`Connection to ${pool.address} interrupted due to server monitor timeout`); this.address = pool.address; + + this.addErrorLabel(MongoErrorLabel.RetryableWriteError); } override get name(): string { diff --git a/src/error.ts b/src/error.ts index 800be95c8b8..1dd426cb4fa 100644 --- a/src/error.ts +++ b/src/error.ts @@ -91,6 +91,7 @@ export const MongoErrorLabel = Object.freeze({ ResumableChangeStreamError: 'ResumableChangeStreamError', HandshakeError: 'HandshakeError', ResetPool: 'ResetPool', + InterruptInUseConnections: 'InterruptInUseConnections', NoWritesPerformed: 'NoWritesPerformed' } as const); diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 3711dc59ed4..b35093b435a 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -4,7 +4,7 @@ import { Document, Long } from '../bson'; import { connect } from '../cmap/connect'; import { Connection, ConnectionOptions } from '../cmap/connection'; import { LEGACY_HELLO_COMMAND } from '../constants'; -import { MongoError, MongoErrorLabel } from '../error'; +import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Callback } from '../utils'; import { calculateDurationInMs, EventEmitterWithState, makeStateMachine, now, ns } from '../utils'; @@ -221,6 +221,9 @@ function checkServer(monitor: Monitor, callback: Callback) { const error = !(err instanceof MongoError) ? new MongoError(err) : err; error.addErrorLabel(MongoErrorLabel.ResetPool); + if (error instanceof MongoNetworkTimeoutError) { + error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections); + } monitor.emit('resetServer', error); callback(err); diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index d157ef63f69..cd2c48655d9 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -6,6 +6,7 @@ import { deserialize, serialize } from '../bson'; import type { MongoCredentials } from '../cmap/auth/mongo_credentials'; import type { ConnectionEvents, DestroyOptions } from '../cmap/connection'; import type { CloseOptions, ConnectionPoolEvents } from '../cmap/connection_pool'; +import { PoolClearedOnNetworkError } from '../cmap/errors'; import { DEFAULT_OPTIONS, FEATURE_FLAGS } from '../connection_string'; import { CLOSE, @@ -839,7 +840,11 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes incomingServerDescription.error instanceof MongoError && incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool) ) { - server.s.pool.clear(); + const interruptInUseConnections = incomingServerDescription.error.hasErrorLabel( + MongoErrorLabel.InterruptInUseConnections + ); + + server.s.pool.clear({ interruptInUseConnections }); } else if (incomingServerDescription.error == null) { const newTopologyType = topology.s.description.type; const shouldMarkPoolReady = diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts index d0eb20858db..8cd12bf2c5e 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts @@ -19,17 +19,28 @@ const LB_SKIP_TESTS: SkipDescription[] = [ skipReason: 'cannot run against a load balanced environment' })); +const INTERRUPT_IN_USE_SKIPPED_TESTS: SkipDescription[] = [ + { + description: 'clear with interruptInUseConnections = true closes pending connections', + skipIfCondition: 'always', + skipReason: 'TODO(NODE-xxxx): track and kill pending connections' + } +]; + describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () { const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling'); runCmapTestSuite(tests, { - testsToSkip: LB_SKIP_TESTS.concat([ - { - description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS', - skipIfCondition: 'always', - skipReason: - 'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver' - } - ]) + testsToSkip: LB_SKIP_TESTS.concat( + [ + { + description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS', + skipIfCondition: 'always', + skipReason: + 'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver' + } + ], + INTERRUPT_IN_USE_SKIPPED_TESTS + ) }); }); diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index ab3fe03bf8b..634e732f3e9 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -197,7 +197,7 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({ return threadContext.pool.checkIn(connection); }, - clear: function (interruptInUseConnections: boolean) { + clear: function ({ interruptInUseConnections }: { interruptInUseConnections: boolean }) { return threadContext.pool.clear({ interruptInUseConnections }); }, close: async function () {