From 49dfc40307bd82bf3ec06aae9c95ee77ee36eaad Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 6 Jul 2022 18:24:06 -0400 Subject: [PATCH 01/55] feat: add connection pool ready event --- src/cmap/connection_pool.ts | 9 +++++++++ src/cmap/connection_pool_events.ts | 12 ++++++++++++ src/constants.ts | 1 + src/index.ts | 1 + 4 files changed, 23 insertions(+) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 9a13b7d486..f7cdfe2212 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -13,6 +13,7 @@ import { CONNECTION_POOL_CLEARED, CONNECTION_POOL_CLOSED, CONNECTION_POOL_CREATED, + CONNECTION_POOL_READY, CONNECTION_READY } from '../constants'; import { MongoError, MongoInvalidArgumentError, MongoRuntimeError } from '../error'; @@ -31,6 +32,7 @@ import { ConnectionPoolClearedEvent, ConnectionPoolClosedEvent, ConnectionPoolCreatedEvent, + ConnectionPoolReadyEvent, ConnectionReadyEvent } from './connection_pool_events'; import { PoolClosedError, WaitQueueTimeoutError } from './errors'; @@ -103,6 +105,7 @@ export interface CloseOptions { /** @public */ export type ConnectionPoolEvents = { connectionPoolCreated(event: ConnectionPoolCreatedEvent): void; + connectionPoolReady(event: ConnectionPoolReadyEvent): void; connectionPoolClosed(event: ConnectionPoolClosedEvent): void; connectionPoolCleared(event: ConnectionPoolClearedEvent): void; connectionCreated(event: ConnectionCreatedEvent): void; @@ -167,6 +170,11 @@ export class ConnectionPool extends TypedEventEmitter { * @event */ static readonly CONNECTION_POOL_CLEARED = CONNECTION_POOL_CLEARED; + /** + * Emitted each time the connection pool is marked ready + * @event + */ + static readonly CONNECTION_POOL_READY = CONNECTION_POOL_READY; /** * Emitted when a connection is created. * @event @@ -309,6 +317,7 @@ export class ConnectionPool extends TypedEventEmitter { */ ready(): void { this[kPoolState] = PoolState.ready; + this.emit(ConnectionPool.CONNECTION_POOL_READY, new ConnectionPoolReadyEvent(this)); } /** diff --git a/src/cmap/connection_pool_events.ts b/src/cmap/connection_pool_events.ts index a98dc25d78..90c4826d58 100644 --- a/src/cmap/connection_pool_events.ts +++ b/src/cmap/connection_pool_events.ts @@ -37,6 +37,18 @@ export class ConnectionPoolCreatedEvent extends ConnectionPoolMonitoringEvent { } } +/** + * An event published when a connection pool is ready + * @public + * @category Event + */ +export class ConnectionPoolReadyEvent extends ConnectionPoolMonitoringEvent { + /** @internal */ + constructor(pool: ConnectionPool) { + super(pool); + } +} + /** * An event published when a connection pool is closed * @public diff --git a/src/constants.ts b/src/constants.ts index 2326e8d2c1..9cc375730a 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -26,6 +26,7 @@ export const TOPOLOGY_DESCRIPTION_CHANGED = 'topologyDescriptionChanged' as cons export const CONNECTION_POOL_CREATED = 'connectionPoolCreated' as const; export const CONNECTION_POOL_CLOSED = 'connectionPoolClosed' as const; export const CONNECTION_POOL_CLEARED = 'connectionPoolCleared' as const; +export const CONNECTION_POOL_READY = 'connectionPoolReady' as const; export const CONNECTION_CREATED = 'connectionCreated' as const; export const CONNECTION_READY = 'connectionReady' as const; export const CONNECTION_CLOSED = 'connectionClosed' as const; diff --git a/src/index.ts b/src/index.ts index 933be8772e..865947fdd3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -132,6 +132,7 @@ export { ConnectionPoolClosedEvent, ConnectionPoolCreatedEvent, ConnectionPoolMonitoringEvent, + ConnectionPoolReadyEvent, ConnectionReadyEvent } from './cmap/connection_pool_events'; export { From 46810f2c8c39c97ff0807e50db00ca838c2fd19d Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Thu, 7 Jul 2022 10:58:23 -0400 Subject: [PATCH 02/55] feat: add basic implementation of PoolClearedError --- src/cmap/connection_pool.ts | 11 +++++++---- src/cmap/errors.ts | 22 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index f7cdfe2212..59a9acc85a 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -35,7 +35,7 @@ import { ConnectionPoolReadyEvent, ConnectionReadyEvent } from './connection_pool_events'; -import { PoolClosedError, WaitQueueTimeoutError } from './errors'; +import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors'; import { ConnectionPoolMetrics } from './metrics'; /** @internal */ @@ -331,12 +331,15 @@ export class ConnectionPool extends TypedEventEmitter { new ConnectionCheckOutStartedEvent(this) ); - if (this.closed) { + // TODO: combine these into check for !ready? + if (this.closed || this[kPoolState] === PoolState.paused) { + const reason = this.closed ? 'poolClosed' : 'connectionError'; + const error = this.closed ? new PoolClosedError(this) : new PoolClearedError(this); this.emit( ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - new ConnectionCheckOutFailedEvent(this, 'poolClosed') + new ConnectionCheckOutFailedEvent(this, reason) ); - callback(new PoolClosedError(this)); + callback(error); return; } diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index 85e8dac798..a809863337 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -19,6 +19,28 @@ export class PoolClosedError extends MongoDriverError { } } +/** + * An error indicating a connection pool is currently paused + * @category Error + */ +export class PoolClearedError extends MongoDriverError { + // TODO: needs to extend RetryableError or be marked retryable in some other way per spec + /** The address of the connection pool */ + address: string; + + constructor(pool: ConnectionPool) { + // TODO: pass in original pool-clearing error and use in message + super( + `Connection pool for ${pool.address} was cleared because another operation failed with: ` + ); + this.address = pool.address; + } + + override get name(): string { + return 'MongoPoolClosedError'; + } +} + /** * An error thrown when a request to check out a connection times out * @category Error From 143d0086e6df4bb6f14359dab87936bcbb43c8ac Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 8 Jul 2022 16:24:16 -0400 Subject: [PATCH 03/55] feat: clear pauses pool --- src/cmap/connection_pool.ts | 79 ++++++++++++++++++++++++------------- 1 file changed, 52 insertions(+), 27 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 59a9acc85a..fa6d3a1a65 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -250,7 +250,6 @@ export class ConnectionPool extends TypedEventEmitter { process.nextTick(() => { this.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this)); - this.ensureMinPoolSize(); }); } @@ -318,6 +317,7 @@ export class ConnectionPool extends TypedEventEmitter { ready(): void { this[kPoolState] = PoolState.ready; this.emit(ConnectionPool.CONNECTION_POOL_READY, new ConnectionPoolReadyEvent(this)); + this.ensureMinPoolSize(); } /** @@ -402,6 +402,7 @@ export class ConnectionPool extends TypedEventEmitter { * previous generation will eventually be pruned during subsequent checkouts. */ clear(serviceId?: ObjectId): void { + // handle load balanced case if (this.loadBalanced && serviceId) { const sid = serviceId.toHexString(); const generation = this.serviceGenerations.get(sid); @@ -414,14 +415,25 @@ export class ConnectionPool extends TypedEventEmitter { // Increment the generation for the service id. this.serviceGenerations.set(sid, generation + 1); } - } else { - this[kGeneration] += 1; + this.emit( + ConnectionPool.CONNECTION_POOL_CLEARED, + new ConnectionPoolClearedEvent(this, serviceId) + ); + return; } - this.emit( - ConnectionPool.CONNECTION_POOL_CLEARED, - new ConnectionPoolClearedEvent(this, serviceId) - ); + // handle non load-balanced case + this[kGeneration] += 1; + const alreadyPaused = this[kPoolState] === PoolState.paused; + this[kPoolState] = PoolState.paused; + + this.clearWaitQueue(); + this.clearMinPoolSizeTimer(); + // TODO: should we also cancel in-flight connections? + + if (!alreadyPaused) { + this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this)); + } } /** Close the pool */ @@ -442,25 +454,9 @@ export class ConnectionPool extends TypedEventEmitter { // immediately cancel any in-flight connections this[kCancellationToken].emit('cancel'); - // drain the wait queue - while (this.waitQueueSize) { - const waitQueueMember = this[kWaitQueue].pop(); - if (waitQueueMember) { - if (waitQueueMember.timer) { - clearTimeout(waitQueueMember.timer); - } - if (!waitQueueMember[kCancelled]) { - // TODO(NODE-3483): Replace with MongoConnectionPoolClosedError - waitQueueMember.callback(new MongoRuntimeError('Connection pool closed')); - } - } - } - - // clear the min pool size timer - const minPoolSizeTimer = this[kMinPoolSizeTimer]; - if (minPoolSizeTimer) { - clearTimeout(minPoolSizeTimer); - } + this[kPoolState] = PoolState.closed; + this.clearWaitQueue(); + this.clearMinPoolSizeTimer(); // end the connection counter if (typeof this[kConnectionCounter].return === 'function') { @@ -538,6 +534,34 @@ export class ConnectionPool extends TypedEventEmitter { }); } + /** Clear the waitqueue */ + private clearWaitQueue(): void { + while (this.waitQueueSize) { + const waitQueueMember = this[kWaitQueue].pop(); + if (waitQueueMember) { + if (waitQueueMember.timer) { + clearTimeout(waitQueueMember.timer); + } + if (!waitQueueMember[kCancelled]) { + const errorMessage = + this[kPoolState] === PoolState.paused + ? 'Connection pool cleared' + : 'Connection pool closed'; + // TODO(NODE-3483): Replace with more specific error type + waitQueueMember.callback(new MongoRuntimeError(errorMessage)); + } + } + } + } + + /** Clear the min pool size timer */ + private clearMinPoolSizeTimer(): void { + const minPoolSizeTimer = this[kMinPoolSizeTimer]; + if (minPoolSizeTimer) { + clearTimeout(minPoolSizeTimer); + } + } + private destroyConnection(connection: Connection, reason: string) { this.emit( ConnectionPool.CONNECTION_CLOSED, @@ -660,7 +684,8 @@ export class ConnectionPool extends TypedEventEmitter { } private processWaitQueue() { - if (this.closed || this[kProcessingWaitQueue]) { + // TODO: combine into ready check + if (this.closed || this[kProcessingWaitQueue] || this[kPoolState] === PoolState.paused) { return; } From b53e2c0c03a08c9c4dab1519c60fc1025c4b2c73 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 8 Jul 2022 17:51:48 -0400 Subject: [PATCH 04/55] fix: calling ready on ready pool --- src/cmap/connection_pool.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index fa6d3a1a65..cccb7ab733 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -315,6 +315,9 @@ export class ConnectionPool extends TypedEventEmitter { * Set the pool state to "ready" */ ready(): void { + if (this[kPoolState] === PoolState.ready) { + return; + } this[kPoolState] = PoolState.ready; this.emit(ConnectionPool.CONNECTION_POOL_READY, new ConnectionPoolReadyEvent(this)); this.ensureMinPoolSize(); From e97757ea6ea05b2351214dbb7033b7cca0e68215 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 8 Jul 2022 17:52:27 -0400 Subject: [PATCH 05/55] feat: sdam marks pool ready on successful check --- src/sdam/topology.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 69b82befac..97c56530dd 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -813,7 +813,15 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) { const server = topology.s.servers.get(incomingServerDescription.address); if (server) { + const newTopologyType = topology.s.description.type; + const shouldMarkPoolReady = + incomingServerDescription.isDataBearing || + (incomingServerDescription.type !== ServerType.Unknown && + newTopologyType === TopologyType.Single); server.s.description = incomingServerDescription; + if (shouldMarkPoolReady) { + server.s.pool.ready(); + } } } From 7fd95371026f3ce0b152f2343f604f24e827c6a9 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Mon, 11 Jul 2022 16:57:25 -0400 Subject: [PATCH 06/55] fix: emit checkout failed event if pool paused --- src/cmap/connection_pool.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index cccb7ab733..9fbbe84fce 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -550,7 +550,12 @@ export class ConnectionPool extends TypedEventEmitter { this[kPoolState] === PoolState.paused ? 'Connection pool cleared' : 'Connection pool closed'; + const reason = this.closed ? 'poolClosed' : 'connectionError'; // TODO(NODE-3483): Replace with more specific error type + this.emit( + ConnectionPool.CONNECTION_CHECK_OUT_FAILED, + new ConnectionCheckOutFailedEvent(this, reason) + ); waitQueueMember.callback(new MongoRuntimeError(errorMessage)); } } From 72977c1807de40d1e1d85478d50cceac0e03babc Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Mon, 11 Jul 2022 17:26:58 -0400 Subject: [PATCH 07/55] feat: ensureMinPoolSize prunes perished connections --- src/cmap/connection_pool.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 9fbbe84fce..ff6dd0d190 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -575,7 +575,6 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CLOSED, new ConnectionClosedEvent(this, connection, reason) ); - // destroy the connection process.nextTick(() => connection.destroy()); } @@ -671,6 +670,13 @@ export class ConnectionPool extends TypedEventEmitter { return; } + for (let i = 0; i < this[kConnections].length; i++) { + const connection = this[kConnections].peekAt(i); + if (connection && this.connectionIsPerished(connection)) { + this[kConnections].removeOne(i); + } + } + if ( this.totalConnectionCount < minPoolSize && this.pendingConnectionCount < this.options.maxConnecting From f0735a7e1a29c805f0e1c9c6a3c8de3290bbff36 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Mon, 11 Jul 2022 17:28:38 -0400 Subject: [PATCH 08/55] test: add cmap runner support for pool paused tests --- test/tools/cmap_spec_runner.ts | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index e4202a43f2..bf7ea96a6e 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -1,22 +1,24 @@ import { expect } from 'chai'; import { EventEmitter } from 'events'; +import { setTimeout } from 'timers'; import { promisify } from 'util'; import { Connection, HostAddress, MongoClient } from '../../src'; import { ConnectionPool, ConnectionPoolOptions } from '../../src/cmap/connection_pool'; -import { shuffle } from '../../src/utils'; +import { makeClientMetadata, shuffle } from '../../src/utils'; import { isAnyRequirementSatisfied } from './unified-spec-runner/unified-utils'; import { FailPoint, sleep } from './utils'; type CmapOperation = | { name: 'start' | 'waitForThread'; target: string } | { name: 'wait'; ms: number } - | { name: 'waitForEvent'; event: string; count: number } + | { name: 'waitForEvent'; event: string; count: number; timeout?: number } | { name: 'checkOut'; thread: string; label: string } | { name: 'checkIn'; connection: string } | { name: 'clear' | 'close' | 'ready' }; const CMAP_POOL_OPTION_NAMES: Array = [ + 'appName', 'backgroundThreadIntervalMS', 'maxPoolSize', 'minPoolSize', @@ -26,6 +28,7 @@ const CMAP_POOL_OPTION_NAMES: Array = [ ]; type CmapPoolOptions = { + appName?: string; backgroundThreadIntervalMS?: number; maxPoolSize?: number; minPoolSize?: number; @@ -81,6 +84,7 @@ const ALL_POOL_EVENTS = new Set([ ConnectionPool.CONNECTION_POOL_CREATED, ConnectionPool.CONNECTION_POOL_CLOSED, ConnectionPool.CONNECTION_POOL_CLEARED, + ConnectionPool.CONNECTION_POOL_READY, ConnectionPool.CONNECTION_CREATED, ConnectionPool.CONNECTION_READY, ConnectionPool.CONNECTION_CLOSED, @@ -210,8 +214,7 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({ return await promisify(ConnectionPool.prototype.close).call(threadContext.pool); }, ready: function () { - // This is a no-op until pool pausing is implemented - return; + return threadContext.pool.ready(); }, wait: async function (options) { const ms = options.ms; @@ -237,9 +240,15 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({ waitForEvent: function (options): Promise { const event = options.event; const count = options.count; - return new Promise(resolve => { + const timeout = options.timeout ?? 15000; + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + reject(new Error(`Timed out while waiting for event ${event}`)); + }, timeout); + function run() { if (threadContext.poolEvents.filter(ev => getEventType(ev) === event).length >= count) { + clearTimeout(timeoutId); // TODO: do we want to also always import this from timers? return resolve(); } @@ -351,12 +360,15 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) { delete poolOptions.backgroundThreadIntervalMS; } + let metadata; + if (poolOptions.appName) { + metadata = makeClientMetadata({ appName: poolOptions.appName }); + delete poolOptions.appName; + } + const operations = test.operations; const expectedError = test.error; - const expectedEvents = test.events - ? // TODO(NODE-2994): remove filter once ready is implemented - test.events.filter(event => event.type !== 'ConnectionPoolReady') - : []; + const expectedEvents = test.events; const ignoreEvents = test.ignore || []; let actualError; @@ -365,7 +377,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) { const mainThread = threadContext.getThread(MAIN_THREAD_KEY); mainThread.start(); - threadContext.createPool(poolOptions); + threadContext.createPool({ ...poolOptions, metadata }); // yield control back to the event loop so that the ConnectionPoolCreatedEvent // has a chance to be fired before any synchronously-emitted events from // the queued operations From fb210de48aa83689a7f1f887377f60c9d1e72f96 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Mon, 11 Jul 2022 17:30:55 -0400 Subject: [PATCH 09/55] test: unskip the passing pool paused tests --- .../connection_monitoring_and_pooling.spec.test.ts | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts index bff16bd245..d71dbe912b 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts @@ -13,13 +13,7 @@ const LB_SKIP_TESTS: SkipDescription[] = [ })); const POOL_PAUSED_SKIP_TESTS: SkipDescription[] = [ - 'clearing pool clears the WaitQueue', - 'pool clear halts background minPoolSize establishments', - 'clearing a paused pool emits no events', - 'after clear, cannot check out connections until pool ready', - 'error during minPoolSize population clears pool', - 'readying a ready pool emits no events', - 'pool starts as cleared and becomes ready' + 'error during minPoolSize population clears pool' ].map(description => ({ description, skipIfCondition: 'always', From 7fe8ce3279bd1be3c569b830ff6635b98708c82f Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 13 Jul 2022 16:34:41 -0400 Subject: [PATCH 10/55] fix: pool cleared error name --- src/cmap/errors.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index a809863337..6efd08c918 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -37,7 +37,7 @@ export class PoolClearedError extends MongoDriverError { } override get name(): string { - return 'MongoPoolClosedError'; + return 'MongoPoolClearedError'; } } From ae629f0c246cb206df5d25432407de38d29803f3 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 13 Jul 2022 16:38:27 -0400 Subject: [PATCH 11/55] fix: update placeholder message for PoolClearedError --- src/cmap/errors.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index 6efd08c918..47d4ab9f5b 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -30,9 +30,8 @@ export class PoolClearedError extends MongoDriverError { constructor(pool: ConnectionPool) { // TODO: pass in original pool-clearing error and use in message - super( - `Connection pool for ${pool.address} was cleared because another operation failed with: ` - ); + // "failed with: " + super(`Connection pool for ${pool.address} was cleared because another operation failed`); this.address = pool.address; } From 53ee54ee237a27553562c70fc5611364138a1c9d Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 20 Jul 2022 16:40:01 -0400 Subject: [PATCH 12/55] refactor: update constants --- src/cmap/errors.ts | 4 +-- src/constants.ts | 5 ++-- test/tools/cmap_spec_runner.ts | 15 ++-------- test/tools/spec-runner/index.js | 51 ++++++++++++--------------------- 4 files changed, 26 insertions(+), 49 deletions(-) diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index 47d4ab9f5b..74dd0de9f1 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -1,4 +1,4 @@ -import { MongoDriverError } from '../error'; +import { MongoDriverError, MongoNetworkError } from '../error'; import type { ConnectionPool } from './connection_pool'; /** @@ -23,7 +23,7 @@ export class PoolClosedError extends MongoDriverError { * An error indicating a connection pool is currently paused * @category Error */ -export class PoolClearedError extends MongoDriverError { +export class PoolClearedError extends MongoNetworkError { // TODO: needs to extend RetryableError or be marked retryable in some other way per spec /** The address of the connection pool */ address: string; diff --git a/src/constants.ts b/src/constants.ts index 9cc375730a..eec4b07571 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -58,6 +58,8 @@ export const HEARTBEAT_EVENTS = Object.freeze([ /** @public */ export const CMAP_EVENTS = Object.freeze([ CONNECTION_POOL_CREATED, + CONNECTION_POOL_READY, + CONNECTION_POOL_CLEARED, CONNECTION_POOL_CLOSED, CONNECTION_CREATED, CONNECTION_READY, @@ -65,8 +67,7 @@ export const CMAP_EVENTS = Object.freeze([ CONNECTION_CHECK_OUT_STARTED, CONNECTION_CHECK_OUT_FAILED, CONNECTION_CHECKED_OUT, - CONNECTION_CHECKED_IN, - CONNECTION_POOL_CLEARED + CONNECTION_CHECKED_IN ] as const); /** @public */ diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index bf7ea96a6e..3739f6f365 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -5,6 +5,7 @@ import { promisify } from 'util'; import { Connection, HostAddress, MongoClient } from '../../src'; import { ConnectionPool, ConnectionPoolOptions } from '../../src/cmap/connection_pool'; +import { CMAP_EVENTS } from '../../src/constants'; import { makeClientMetadata, shuffle } from '../../src/utils'; import { isAnyRequirementSatisfied } from './unified-spec-runner/unified-utils'; import { FailPoint, sleep } from './utils'; @@ -80,19 +81,7 @@ export type CmapTest = { failPoint?: FailPoint; }; -const ALL_POOL_EVENTS = new Set([ - ConnectionPool.CONNECTION_POOL_CREATED, - ConnectionPool.CONNECTION_POOL_CLOSED, - ConnectionPool.CONNECTION_POOL_CLEARED, - ConnectionPool.CONNECTION_POOL_READY, - ConnectionPool.CONNECTION_CREATED, - ConnectionPool.CONNECTION_READY, - ConnectionPool.CONNECTION_CLOSED, - ConnectionPool.CONNECTION_CHECK_OUT_STARTED, - ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - ConnectionPool.CONNECTION_CHECKED_OUT, - ConnectionPool.CONNECTION_CHECKED_IN -]); +const ALL_POOL_EVENTS = new Set(CMAP_EVENTS); function getEventType(event) { const eventName = event.constructor.name; diff --git a/test/tools/spec-runner/index.js b/test/tools/spec-runner/index.js index 4deb60d99d..e85303777d 100644 --- a/test/tools/spec-runner/index.js +++ b/test/tools/spec-runner/index.js @@ -8,7 +8,12 @@ const { EJSON } = require('bson'); const { isRecord } = require('../../../src/utils'); const TestRunnerContext = require('./context').TestRunnerContext; const resolveConnectionString = require('./utils').resolveConnectionString; -const { LEGACY_HELLO_COMMAND } = require('../../../src/constants'); +const { + LEGACY_HELLO_COMMAND, + CMAP_EVENTS: SOURCE_CMAP_EVENTS, + TOPOLOGY_EVENTS, + HEARTBEAT_EVENTS +} = require('../../../src/constants'); const { isAnyRequirementSatisfied } = require('../unified-spec-runner/unified-utils'); const ClientSideEncryptionFilter = require('../runner/filters/client_encryption_filter'); @@ -164,16 +169,16 @@ function generateTopologyTests(testSuites, testContext, filter) { const { spec } = this.currentTest; - if ( - shouldRun && - spec.operations.some( - op => op.name === 'waitForEvent' && op.arguments.event === 'PoolReadyEvent' - ) - ) { - this.currentTest.skipReason = - 'TODO(NODE-2994): Connection storms work will add new events to connection pool'; - shouldRun = false; - } + // if ( + // shouldRun && + // spec.operations.some( + // op => op.name === 'waitForEvent' && op.arguments.event === 'PoolReadyEvent' + // ) + // ) { + // this.currentTest.skipReason = + // 'TODO(NODE-2994): Connection storms work will add new events to connection pool'; + // shouldRun = false; + // } if (shouldRun && spec.skipReason) { this.currentTest.skipReason = spec.skipReason; @@ -335,29 +340,11 @@ function parseSessionOptions(options) { const IGNORED_COMMANDS = new Set([LEGACY_HELLO_COMMAND, 'configureFailPoint', 'endSessions']); const SDAM_EVENTS = new Set([ - 'serverOpening', - 'serverClosed', - 'serverDescriptionChanged', - 'topologyOpening', - 'topologyClosed', - 'topologyDescriptionChanged', - 'serverHeartbeatStarted', - 'serverHeartbeatSucceeded', - 'serverHeartbeatFailed' + ...TOPOLOGY_EVENTS.filter(ev => !['error', 'timeout', 'close'].includes(ev)), + ...HEARTBEAT_EVENTS ]); -const CMAP_EVENTS = new Set([ - 'connectionPoolCreated', - 'connectionPoolClosed', - 'connectionCreated', - 'connectionReady', - 'connectionClosed', - 'connectionCheckOutStarted', - 'connectionCheckOutFailed', - 'connectionCheckedOut', - 'connectionCheckedIn', - 'connectionPoolCleared' -]); +const CMAP_EVENTS = new Set(SOURCE_CMAP_EVENTS); function runTestSuiteTest(configuration, spec, context) { context.commandEvents = []; From ccc60570ed021cfd4ab11088efee4fd8f0086a47 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 24 Aug 2022 14:14:09 -0400 Subject: [PATCH 13/55] test: skip failing cmap test --- .../server_discovery_and_monitoring.spec.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts index 84e3289e62..d3eef4a8c2 100644 --- a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts @@ -19,6 +19,8 @@ const filter: TestFilter = ({ description }) => { return isAuthEnabled ? 'TODO(NODE-3891): fix tests broken when AUTH enabled' : false; case 'Network error on minPoolSize background creation': return 'TODO(NODE-4385): implement pool pausing and pool ready event'; + case 'PoolClearedError does not mark server unknown': + return 'TODO(NODE-3135): make CMAP SDAM-aware and ensure PoolClearError is retryable'; default: return false; } From e9405ce1037e85bbd36f2382b6fb13b3d253b577 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 20 Jul 2022 17:07:03 -0400 Subject: [PATCH 14/55] refactor: misc cleanup --- .../connection_monitoring_and_pooling.spec.test.ts | 2 +- test/tools/cmap_spec_runner.ts | 4 ++-- test/tools/spec-runner/index.js | 11 ----------- 3 files changed, 3 insertions(+), 14 deletions(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts index d71dbe912b..1aed220dba 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts @@ -17,7 +17,7 @@ const POOL_PAUSED_SKIP_TESTS: SkipDescription[] = [ ].map(description => ({ description, skipIfCondition: 'always', - skipReason: 'TODO(NODE-2994): implement pool pausing' + skipReason: 'TODO(NODE-3135): make connection pool SDAM aware' })); describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () { diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index 3739f6f365..54fc94cfd6 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { EventEmitter } from 'events'; -import { setTimeout } from 'timers'; +import { clearTimeout, setTimeout } from 'timers'; import { promisify } from 'util'; import { Connection, HostAddress, MongoClient } from '../../src'; @@ -237,7 +237,7 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({ function run() { if (threadContext.poolEvents.filter(ev => getEventType(ev) === event).length >= count) { - clearTimeout(timeoutId); // TODO: do we want to also always import this from timers? + clearTimeout(timeoutId); return resolve(); } diff --git a/test/tools/spec-runner/index.js b/test/tools/spec-runner/index.js index e85303777d..01eb8fa6e8 100644 --- a/test/tools/spec-runner/index.js +++ b/test/tools/spec-runner/index.js @@ -169,17 +169,6 @@ function generateTopologyTests(testSuites, testContext, filter) { const { spec } = this.currentTest; - // if ( - // shouldRun && - // spec.operations.some( - // op => op.name === 'waitForEvent' && op.arguments.event === 'PoolReadyEvent' - // ) - // ) { - // this.currentTest.skipReason = - // 'TODO(NODE-2994): Connection storms work will add new events to connection pool'; - // shouldRun = false; - // } - if (shouldRun && spec.skipReason) { this.currentTest.skipReason = spec.skipReason; shouldRun = false; From 941e8f038abb34716a23adb81dcee9110fd3f91e Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 20 Jul 2022 18:18:58 -0400 Subject: [PATCH 15/55] test: add sdam cmap prose test --- ...ver_discovery_and_monitoring.prose.test.ts | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts new file mode 100644 index 0000000000..72095ffb75 --- /dev/null +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts @@ -0,0 +1,104 @@ +import { expect } from 'chai'; +import { once } from 'events'; + +import { MongoClient } from '../../../src'; +import { + CONNECTION_POOL_CLEARED, + CONNECTION_POOL_READY, + SERVER_HEARTBEAT_FAILED, + SERVER_HEARTBEAT_SUCCEEDED +} from '../../../src/constants'; + +describe('Server Discover and Monitoring Prose Tests', function () { + context('Connection Pool Management', function () { + /* + This test will be used to ensure monitors properly create and unpause connection pools when they discover servers. + This test requires failCommand appName support which is only available in MongoDB 4.2.9+. + 1. Create a client with directConnection=true, appName="SDAMPoolManagementTest", and heartbeatFrequencyMS=500 (or lower if possible). + 2. Verify via SDAM and CMAP event monitoring that a ConnectionPoolReadyEvent occurs after the first ServerHeartbeatSucceededEvent event does. + 3. Enable the following failpoint: + { + configureFailPoint: "failCommand", + mode: { times: 2 }, + data: { + failCommands: ["hello"], // or legacy hello command + errorCode: 1234, + appName: "SDAMPoolManagementTest" + } + } + 4. Verify that a ServerHeartbeatFailedEvent and a ConnectionPoolClearedEvent (CMAP) are emitted. + 5. Then verify that a ServerHeartbeatSucceededEvent and a ConnectionPoolReadyEvent (CMAP) are emitted. + 6. Disable the failpoint. + */ + + let client: MongoClient; + const events = []; + beforeEach(async function () { + client = this.configuration.newClient({ + directConnection: true, + appName: 'SDAMPoolManagementTest', + heartbeatFrequencyMS: 500 + }); + + for (const event of [ + CONNECTION_POOL_READY, + SERVER_HEARTBEAT_SUCCEEDED, + SERVER_HEARTBEAT_FAILED, + CONNECTION_POOL_CLEARED + ]) { + client.on(event, () => { + events.push(event); + }); + } + }); + + afterEach(async function () { + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'off', + data: { + failCommands: ['hello'], + errorCode: 1234, + appName: 'SDAMPoolManagementTest' + } + }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('ensure monitors properly create and unpause connection pools when they discover servers', { + metadata: { requires: { mongodb: '>=4.2.9' } }, + test: async function () { + await client.connect(); + expect(events.shift()).to.equal(SERVER_HEARTBEAT_SUCCEEDED); + expect(events.shift()).to.equal(CONNECTION_POOL_READY); + + expect(events).to.be.empty; + + const heartBeatFailedEvent = once(client, SERVER_HEARTBEAT_FAILED); + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, + data: { + failCommands: ['hello'], + errorCode: 1234, + appName: 'SDAMPoolManagementTest' + } + }); + await heartBeatFailedEvent; + expect(events.shift()).to.equal(SERVER_HEARTBEAT_FAILED); + expect(events.shift()).to.equal(CONNECTION_POOL_CLEARED); + + expect(events).to.be.empty; + + await once(client, SERVER_HEARTBEAT_SUCCEEDED); + expect(events.shift()).to.equal(SERVER_HEARTBEAT_SUCCEEDED); + expect(events.shift()).to.equal(CONNECTION_POOL_READY); + + expect(events).to.be.empty; + } + }); + }); +}); From 6d1648b19d6ab825a415d4483c052aaad219c5e9 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 20 Jul 2022 18:41:41 -0400 Subject: [PATCH 16/55] test: update stat cmap tests --- .../cmap-node-specs/pool-checkin-stats.json | 4 ++++ .../cmap-node-specs/pool-checkout-stats.json | 4 ++++ .../pool-minPoolSize-population-stats.json | 4 ++++ ...minPoolSize-replace-removed-connections.json | 17 ++++++++++++----- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkin-stats.json b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkin-stats.json index 57e71d8f3d..59d5fd302a 100644 --- a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkin-stats.json +++ b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkin-stats.json @@ -6,6 +6,9 @@ "minPoolSize": 3 }, "operations": [ + { + "name": "ready" + }, { "name": "waitForEvent", "event": "ConnectionCreated", @@ -66,6 +69,7 @@ } ], "ignore": [ + "ConnectionPoolReady", "ConnectionReady", "ConnectionClosed", "ConnectionCheckOutStarted" diff --git a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkout-stats.json b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkout-stats.json index 3cc4229eb0..28715da2cb 100644 --- a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkout-stats.json +++ b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkout-stats.json @@ -6,6 +6,9 @@ "minPoolSize": 3 }, "operations": [ + { + "name": "ready" + }, { "name": "waitForEvent", "event": "ConnectionCreated", @@ -56,6 +59,7 @@ } ], "ignore": [ + "ConnectionPoolReady", "ConnectionReady", "ConnectionClosed", "ConnectionCheckOutStarted" diff --git a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json index cb619e2f75..890193706f 100644 --- a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json +++ b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json @@ -6,6 +6,9 @@ "minPoolSize": 3 }, "operations": [ + { + "name": "ready" + }, { "name": "waitForEvent", "event": "ConnectionCreated", @@ -54,6 +57,7 @@ } ], "ignore": [ + "ConnectionPoolReady", "ConnectionReady", "ConnectionClosed" ] diff --git a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json index 0238a09fad..182ce99b35 100644 --- a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json +++ b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json @@ -6,6 +6,9 @@ "minPoolSize": 2 }, "operations": [ + { + "name": "ready" + }, { "name": "waitForEvent", "event": "ConnectionReady", @@ -22,6 +25,9 @@ { "name": "clear" }, + { + "name": "ready" + }, { "name": "checkIn", "connection": "conn" @@ -29,7 +35,7 @@ { "name": "waitForEvent", "event": "ConnectionReady", - "count": 3 + "count": 4 } ], "events": [ @@ -54,11 +60,10 @@ "address": 42 }, { - "type": "ConnectionClosed", - "reason": "stale", + "type": "ConnectionReady", "address": 42, - "availableConnectionCount": 1, - "pendingConnectionCount": 0, + "availableConnectionCount": 0, + "pendingConnectionCount": 1, "totalConnectionCount": 1 }, { @@ -70,8 +75,10 @@ } ], "ignore": [ + "ConnectionPoolReady", "ConnectionPoolCreated", "ConnectionCreated", + "ConnectionClosed", "ConnectionCheckOutStarted" ] } From 0570932ab51a0b42f315eb7400c7657d56e1ef44 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Thu, 21 Jul 2022 18:31:41 -0400 Subject: [PATCH 17/55] test: update cmap unit tests to call ready before running --- test/unit/cmap/connection_pool.test.js | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index fdee3bde66..3f2b5176d7 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -27,6 +27,7 @@ describe('Connection Pool', function () { }); const pool = new ConnectionPool({ maxPoolSize: 1, hostAddress: server.hostAddress() }); + pool.ready(); const events = []; pool.on('connectionClosed', event => events.push(event)); @@ -74,6 +75,8 @@ describe('Connection Pool', function () { hostAddress: server.hostAddress() }); + pool.ready(); + pool.withConnection( (err, conn, cb) => { expect(err).to.not.exist; @@ -102,6 +105,8 @@ describe('Connection Pool', function () { hostAddress: server.hostAddress() }); + pool.ready(); + pool.checkOut((err, conn) => { expect(err).to.not.exist; expect(conn).to.exist; @@ -133,6 +138,8 @@ describe('Connection Pool', function () { }); const pool = new ConnectionPool({ hostAddress: server.hostAddress() }); + pool.ready(); + const callback = (err, result) => { expect(err).to.not.exist; expect(result).to.exist; @@ -167,6 +174,8 @@ describe('Connection Pool', function () { hostAddress: server.hostAddress() }); + pool.ready(); + const callback = err => { expect(err).to.exist; expect(err).to.match(/closed/); @@ -193,6 +202,8 @@ describe('Connection Pool', function () { }); const pool = new ConnectionPool({ hostAddress: server.hostAddress() }); + pool.ready(); + const callback = (err, result) => { expect(err).to.exist; expect(result).to.not.exist; @@ -219,6 +230,7 @@ describe('Connection Pool', function () { }); const pool = new ConnectionPool({ maxPoolSize: 1, hostAddress: server.hostAddress() }); + pool.ready(); const events = []; pool.on('connectionCheckedOut', event => events.push(event)); From 809eee551656c908261acb0795621c0dfa3365cb Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 22 Jul 2022 09:59:27 -0400 Subject: [PATCH 18/55] test: change order of operations to reduce flakiness --- .../pool-minPoolSize-replace-removed-connections.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json index 182ce99b35..7bb82221e1 100644 --- a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json +++ b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json @@ -25,13 +25,13 @@ { "name": "clear" }, - { - "name": "ready" - }, { "name": "checkIn", "connection": "conn" }, + { + "name": "ready" + }, { "name": "waitForEvent", "event": "ConnectionReady", From 629afa01dc1b17f1ff57c878fe5914ea863055aa Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 22 Jul 2022 10:05:41 -0400 Subject: [PATCH 19/55] test: properly skip lb incompatible tests --- .../connection_monitoring_and_pooling.spec.test.ts | 8 +++++++- .../server_discovery_and_monitoring.prose.test.ts | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts index 1aed220dba..8ee4487673 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts @@ -4,8 +4,14 @@ import { CmapTest, runCmapTestSuite, SkipDescription } from '../../tools/cmap_sp // These tests rely on a simple "pool.clear()" command, which is not sufficient // to properly clear the pool in LB mode, since it requires a serviceId to be passed in const LB_SKIP_TESTS: SkipDescription[] = [ + 'must replace removed connections up to minPoolSize', 'must destroy checked in connection if it is stale', - 'must destroy and must not check out a stale connection if found while iterating available connections' + 'must destroy and must not check out a stale connection if found while iterating available connections', + 'clearing pool clears the WaitQueue', + 'pool clear halts background minPoolSize establishments', + 'clearing a paused pool emits no events', + 'after clear, cannot check out connections until pool ready', + 'readying a ready pool emits no events' ].map(description => ({ description, skipIfCondition: 'loadBalanced', diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts index 72095ffb75..8da3bd765a 100644 --- a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts @@ -9,7 +9,7 @@ import { SERVER_HEARTBEAT_SUCCEEDED } from '../../../src/constants'; -describe('Server Discover and Monitoring Prose Tests', function () { +describe('Server Discovery and Monitoring Prose Tests', function () { context('Connection Pool Management', function () { /* This test will be used to ensure monitors properly create and unpause connection pools when they discover servers. @@ -69,7 +69,7 @@ describe('Server Discover and Monitoring Prose Tests', function () { }); it('ensure monitors properly create and unpause connection pools when they discover servers', { - metadata: { requires: { mongodb: '>=4.2.9' } }, + metadata: { requires: { mongodb: '>=4.2.9', topology: '!load-balanced' } }, test: async function () { await client.connect(); expect(events.shift()).to.equal(SERVER_HEARTBEAT_SUCCEEDED); From 97d93ede2b831dd1425f304141ae9b0762a5ca01 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 24 Aug 2022 14:43:52 -0400 Subject: [PATCH 20/55] test: implement unified support for poolReadyEvent and unskip sdam test --- .../server_discovery_and_monitoring.spec.test.ts | 2 -- test/tools/unified-spec-runner/entities.ts | 4 ++++ test/tools/unified-spec-runner/match.ts | 2 ++ test/tools/unified-spec-runner/schema.ts | 1 + 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts index d3eef4a8c2..9e5dc704e4 100644 --- a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts @@ -17,8 +17,6 @@ const filter: TestFilter = ({ description }) => { case 'Reset server and pool after shutdown error during authentication': // These tests time out waiting for the PoolCleared event return isAuthEnabled ? 'TODO(NODE-3891): fix tests broken when AUTH enabled' : false; - case 'Network error on minPoolSize background creation': - return 'TODO(NODE-4385): implement pool pausing and pool ready event'; case 'PoolClearedError does not mark server unknown': return 'TODO(NODE-3135): make CMAP SDAM-aware and ensure PoolClearError is retryable'; default: diff --git a/test/tools/unified-spec-runner/entities.ts b/test/tools/unified-spec-runner/entities.ts index 8173a8cd4e..0ccced041e 100644 --- a/test/tools/unified-spec-runner/entities.ts +++ b/test/tools/unified-spec-runner/entities.ts @@ -18,6 +18,7 @@ import { ConnectionPoolClearedEvent, ConnectionPoolClosedEvent, ConnectionPoolCreatedEvent, + ConnectionPoolReadyEvent, ConnectionReadyEvent } from '../../../src/cmap/connection_pool_events'; import { @@ -86,6 +87,7 @@ export type CommandEvent = CommandStartedEvent | CommandSucceededEvent | Command export type CmapEvent = | ConnectionPoolCreatedEvent | ConnectionPoolClosedEvent + | ConnectionPoolReadyEvent | ConnectionCreatedEvent | ConnectionReadyEvent | ConnectionClosedEvent @@ -110,6 +112,7 @@ export class UnifiedMongoClient extends MongoClient { observedCmapEvents: ( | 'connectionPoolCreated' | 'connectionPoolClosed' + | 'connectionPoolReady' | 'connectionPoolCleared' | 'connectionCreated' | 'connectionReady' @@ -132,6 +135,7 @@ export class UnifiedMongoClient extends MongoClient { static CMAP_EVENT_NAME_LOOKUP = { poolCreatedEvent: 'connectionPoolCreated', poolClosedEvent: 'connectionPoolClosed', + poolReadyEvent: 'connectionPoolReady', poolClearedEvent: 'connectionPoolCleared', connectionCreatedEvent: 'connectionCreated', connectionReadyEvent: 'connectionReady', diff --git a/test/tools/unified-spec-runner/match.ts b/test/tools/unified-spec-runner/match.ts index 123c57f50a..93791f5335 100644 --- a/test/tools/unified-spec-runner/match.ts +++ b/test/tools/unified-spec-runner/match.ts @@ -28,6 +28,7 @@ import { ConnectionPoolClearedEvent, ConnectionPoolClosedEvent, ConnectionPoolCreatedEvent, + ConnectionPoolReadyEvent, ConnectionReadyEvent } from '../../../src/cmap/connection_pool_events'; import { ejson } from '../utils'; @@ -331,6 +332,7 @@ export function specialCheck( // CMAP events where the payload does not matter. const EMPTY_CMAP_EVENTS = { poolCreatedEvent: ConnectionPoolCreatedEvent, + poolReadyEvent: ConnectionPoolReadyEvent, poolClosedEvent: ConnectionPoolClosedEvent, connectionCreatedEvent: ConnectionCreatedEvent, connectionReadyEvent: ConnectionReadyEvent, diff --git a/test/tools/unified-spec-runner/schema.ts b/test/tools/unified-spec-runner/schema.ts index 05562c0ba1..94b5e2f4f5 100644 --- a/test/tools/unified-spec-runner/schema.ts +++ b/test/tools/unified-spec-runner/schema.ts @@ -114,6 +114,7 @@ export type ObservableCommandEventId = export type ObservableCmapEventId = | 'connectionPoolCreatedEvent' | 'connectionPoolClosedEvent' + | 'connectionPoolReadyEvent' | 'connectionPoolClearedEvent' | 'connectionCreatedEvent' | 'connectionReadyEvent' From a089bf647f97c5f0270c4b763b3e5dae6118ba33 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 24 Aug 2022 15:53:41 -0400 Subject: [PATCH 21/55] test: update skips --- .../server_discovery_and_monitoring.spec.test.ts | 4 +++- .../unified_test_format.spec.test.ts | 14 -------------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts index 9e5dc704e4..bf80d89b15 100644 --- a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts @@ -16,7 +16,9 @@ const filter: TestFilter = ({ description }) => { case 'Reset server and pool after network timeout error during authentication': case 'Reset server and pool after shutdown error during authentication': // These tests time out waiting for the PoolCleared event - return isAuthEnabled ? 'TODO(NODE-3891): fix tests broken when AUTH enabled' : false; + return isAuthEnabled + ? 'TODO(NODE-3135): handle auth errors, also see NODE-3891: fix tests broken when AUTH enabled' + : false; case 'PoolClearedError does not mark server unknown': return 'TODO(NODE-3135): make CMAP SDAM-aware and ensure PoolClearError is retryable'; default: diff --git a/test/integration/unified-test-format/unified_test_format.spec.test.ts b/test/integration/unified-test-format/unified_test_format.spec.test.ts index 2258cbefea..0a4ba56935 100644 --- a/test/integration/unified-test-format/unified_test_format.spec.test.ts +++ b/test/integration/unified-test-format/unified_test_format.spec.test.ts @@ -23,20 +23,6 @@ const filter: TestFilter = ({ description }) => { return 'TODO(NODE-3308): failures due unnecessary getMore and killCursors calls in 5.0'; } - if ( - process.env.AUTH === 'auth' && - [ - 'FindOneAndUpdate is committed on first attempt', - 'FindOneAndUpdate is not committed on first attempt', - 'FindOneAndUpdate is never committed', - 'eventType defaults to command if unset', - 'events are captured during an operation', - 'eventType can be set to command and cmap' - ].includes(description) - ) { - return 'TODO(NODE-3891): fix tests broken when AUTH enabled'; - } - return false; }; From 6e0863366e139882150ef3e5273dba937a24ba15 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Thu, 25 Aug 2022 19:02:58 -0400 Subject: [PATCH 22/55] fix: handle closed state for clear and minpoolsize --- src/cmap/connection_pool.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index ff6dd0d190..7fa941411c 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -405,6 +405,11 @@ export class ConnectionPool extends TypedEventEmitter { * previous generation will eventually be pruned during subsequent checkouts. */ clear(serviceId?: ObjectId): void { + // TODO: confirm that clearing a paused pool should still increment the generation + if (this.closed) { + return; + } + // handle load balanced case if (this.loadBalanced && serviceId) { const sid = serviceId.toHexString(); @@ -666,7 +671,9 @@ export class ConnectionPool extends TypedEventEmitter { private ensureMinPoolSize() { const minPoolSize = this.options.minPoolSize; - if (this.closed || minPoolSize === 0) { + // TODO: combine into ready check + // TODO: add extra guards to not do setTimeout at all if it's not ready? + if (this.closed || this[kPoolState] === PoolState.paused || minPoolSize === 0) { return; } From 154a60fdd456a1ce42a999e50dc8b186c4242b9a Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 26 Aug 2022 10:42:23 -0400 Subject: [PATCH 23/55] test: add back skips for failing auth tests --- .../unified_test_format.spec.test.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/integration/unified-test-format/unified_test_format.spec.test.ts b/test/integration/unified-test-format/unified_test_format.spec.test.ts index 0a4ba56935..0af0a0fe68 100644 --- a/test/integration/unified-test-format/unified_test_format.spec.test.ts +++ b/test/integration/unified-test-format/unified_test_format.spec.test.ts @@ -23,6 +23,17 @@ const filter: TestFilter = ({ description }) => { return 'TODO(NODE-3308): failures due unnecessary getMore and killCursors calls in 5.0'; } + if ( + process.env.AUTH === 'auth' && + [ + 'FindOneAndUpdate is committed on first attempt', + 'FindOneAndUpdate is not committed on first attempt', + 'FindOneAndUpdate is never committed' + ].includes(description) + ) { + return 'TODO(NODE-3891): fix tests broken when AUTH enabled'; + } + return false; }; From ad5437bb34020125539de8691dfe09001837c79a Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 26 Aug 2022 11:22:10 -0400 Subject: [PATCH 24/55] fix: tighten up conditions for ensureMinPoolSize --- src/cmap/connection_pool.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 7fa941411c..d1c2b49f72 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -671,9 +671,7 @@ export class ConnectionPool extends TypedEventEmitter { private ensureMinPoolSize() { const minPoolSize = this.options.minPoolSize; - // TODO: combine into ready check - // TODO: add extra guards to not do setTimeout at all if it's not ready? - if (this.closed || this[kPoolState] === PoolState.paused || minPoolSize === 0) { + if (this[kPoolState] !== PoolState.ready || minPoolSize === 0) { return; } @@ -697,7 +695,9 @@ export class ConnectionPool extends TypedEventEmitter { this[kConnections].push(connection); process.nextTick(() => this.processWaitQueue()); } - this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10); + if (this[kPoolState] === PoolState.ready) { + this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10); + } }); } else { this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100); From 5aa4805b462c31e857e52575094235d5f585d21e Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 26 Aug 2022 11:37:20 -0400 Subject: [PATCH 25/55] refactor: minor pool cleanup --- src/cmap/connection_pool.ts | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index d1c2b49f72..ecceeaec72 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -334,8 +334,7 @@ export class ConnectionPool extends TypedEventEmitter { new ConnectionCheckOutStartedEvent(this) ); - // TODO: combine these into check for !ready? - if (this.closed || this[kPoolState] === PoolState.paused) { + if (this[kPoolState] !== PoolState.ready) { const reason = this.closed ? 'poolClosed' : 'connectionError'; const error = this.closed ? new PoolClosedError(this) : new PoolClearedError(this); this.emit( @@ -417,7 +416,6 @@ export class ConnectionPool extends TypedEventEmitter { // Only need to worry if the generation exists, since it should // always be there but typescript needs the check. if (generation == null) { - // TODO(NODE-3483) throw new MongoRuntimeError('Service generations are required in load balancer mode.'); } else { // Increment the generation for the service id. @@ -437,7 +435,6 @@ export class ConnectionPool extends TypedEventEmitter { this.clearWaitQueue(); this.clearMinPoolSizeTimer(); - // TODO: should we also cancel in-flight connections? if (!alreadyPaused) { this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this)); @@ -556,7 +553,6 @@ export class ConnectionPool extends TypedEventEmitter { ? 'Connection pool cleared' : 'Connection pool closed'; const reason = this.closed ? 'poolClosed' : 'connectionError'; - // TODO(NODE-3483): Replace with more specific error type this.emit( ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new ConnectionCheckOutFailedEvent(this, reason) @@ -706,7 +702,8 @@ export class ConnectionPool extends TypedEventEmitter { private processWaitQueue() { // TODO: combine into ready check - if (this.closed || this[kProcessingWaitQueue] || this[kPoolState] === PoolState.paused) { + // actually, need to adjust this behavior to return the appropriate error + if (this.closed || this[kProcessingWaitQueue] || this[kPoolState] !== PoolState.ready) { return; } From d8466d06016aa1ddd091391e5f5409dc9088b157 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 26 Aug 2022 11:46:30 -0400 Subject: [PATCH 26/55] fix: make sure pending queue members get the correct checkout error --- src/cmap/connection_pool.ts | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index ecceeaec72..2f2a7619e8 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -334,17 +334,6 @@ export class ConnectionPool extends TypedEventEmitter { new ConnectionCheckOutStartedEvent(this) ); - if (this[kPoolState] !== PoolState.ready) { - const reason = this.closed ? 'poolClosed' : 'connectionError'; - const error = this.closed ? new PoolClosedError(this) : new PoolClearedError(this); - this.emit( - ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - new ConnectionCheckOutFailedEvent(this, reason) - ); - callback(error); - return; - } - const waitQueueMember: WaitQueueMember = { callback }; const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; if (waitQueueTimeoutMS) { @@ -701,12 +690,9 @@ export class ConnectionPool extends TypedEventEmitter { } private processWaitQueue() { - // TODO: combine into ready check - // actually, need to adjust this behavior to return the appropriate error - if (this.closed || this[kProcessingWaitQueue] || this[kPoolState] !== PoolState.ready) { + if (this[kProcessingWaitQueue]) { return; } - this[kProcessingWaitQueue] = true; while (this.waitQueueSize) { @@ -721,6 +707,21 @@ export class ConnectionPool extends TypedEventEmitter { continue; } + if (this[kPoolState] !== PoolState.ready) { + const reason = this.closed ? 'poolClosed' : 'connectionError'; + const error = this.closed ? new PoolClosedError(this) : new PoolClearedError(this); + this.emit( + ConnectionPool.CONNECTION_CHECK_OUT_FAILED, + new ConnectionCheckOutFailedEvent(this, reason) + ); + if (waitQueueMember.timer) { + clearTimeout(waitQueueMember.timer); + } + this[kWaitQueue].shift(); + waitQueueMember.callback(error); + continue; + } + if (!this.availableConnectionCount) { break; } From 763d7dbb3b29a6d823620e1efb56d251b8e95db9 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 26 Aug 2022 11:57:20 -0400 Subject: [PATCH 27/55] refactor: handle clearing waitqueue via processWaitQueue --- src/cmap/connection_pool.ts | 28 ++-------------------------- 1 file changed, 2 insertions(+), 26 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 2f2a7619e8..216e21abb1 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -422,8 +422,8 @@ export class ConnectionPool extends TypedEventEmitter { const alreadyPaused = this[kPoolState] === PoolState.paused; this[kPoolState] = PoolState.paused; - this.clearWaitQueue(); this.clearMinPoolSizeTimer(); + this.processWaitQueue(); if (!alreadyPaused) { this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this)); @@ -449,8 +449,8 @@ export class ConnectionPool extends TypedEventEmitter { this[kCancellationToken].emit('cancel'); this[kPoolState] = PoolState.closed; - this.clearWaitQueue(); this.clearMinPoolSizeTimer(); + this.processWaitQueue(); // end the connection counter if (typeof this[kConnectionCounter].return === 'function') { @@ -528,30 +528,6 @@ export class ConnectionPool extends TypedEventEmitter { }); } - /** Clear the waitqueue */ - private clearWaitQueue(): void { - while (this.waitQueueSize) { - const waitQueueMember = this[kWaitQueue].pop(); - if (waitQueueMember) { - if (waitQueueMember.timer) { - clearTimeout(waitQueueMember.timer); - } - if (!waitQueueMember[kCancelled]) { - const errorMessage = - this[kPoolState] === PoolState.paused - ? 'Connection pool cleared' - : 'Connection pool closed'; - const reason = this.closed ? 'poolClosed' : 'connectionError'; - this.emit( - ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - new ConnectionCheckOutFailedEvent(this, reason) - ); - waitQueueMember.callback(new MongoRuntimeError(errorMessage)); - } - } - } - } - /** Clear the min pool size timer */ private clearMinPoolSizeTimer(): void { const minPoolSizeTimer = this[kMinPoolSizeTimer]; From 6e30e6f0330a978df96982ba5b8e803ee8f13804 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 26 Aug 2022 12:07:10 -0400 Subject: [PATCH 28/55] refactor: clean up close logic --- src/cmap/connection_pool.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 216e21abb1..a42d15d231 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -448,17 +448,15 @@ export class ConnectionPool extends TypedEventEmitter { // immediately cancel any in-flight connections this[kCancellationToken].emit('cancel'); - this[kPoolState] = PoolState.closed; - this.clearMinPoolSizeTimer(); - this.processWaitQueue(); - // end the connection counter if (typeof this[kConnectionCounter].return === 'function') { this[kConnectionCounter].return(undefined); } - // mark the pool as closed immediately this[kPoolState] = PoolState.closed; + this.clearMinPoolSizeTimer(); + this.processWaitQueue(); + eachAsync( this[kConnections].toArray(), (conn, cb) => { From ae006a701cbdf285778a88340c9b83ecc878ea63 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 26 Aug 2022 12:19:08 -0400 Subject: [PATCH 29/55] lint: remove todo --- src/cmap/connection_pool.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index a42d15d231..3016b9235e 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -393,7 +393,6 @@ export class ConnectionPool extends TypedEventEmitter { * previous generation will eventually be pruned during subsequent checkouts. */ clear(serviceId?: ObjectId): void { - // TODO: confirm that clearing a paused pool should still increment the generation if (this.closed) { return; } From aa98693914aebe4b7aeba9062994a2ed79e9f42e Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 26 Aug 2022 15:31:09 -0400 Subject: [PATCH 30/55] refactor: more pool cleanup --- src/cmap/connection_pool.ts | 2 +- src/cmap/errors.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 3016b9235e..2214c05ce8 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -315,7 +315,7 @@ export class ConnectionPool extends TypedEventEmitter { * Set the pool state to "ready" */ ready(): void { - if (this[kPoolState] === PoolState.ready) { + if (this[kPoolState] !== PoolState.paused) { return; } this[kPoolState] = PoolState.ready; diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index 74dd0de9f1..c8355ec562 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -24,12 +24,12 @@ export class PoolClosedError extends MongoDriverError { * @category Error */ export class PoolClearedError extends MongoNetworkError { - // TODO: needs to extend RetryableError or be marked retryable in some other way per spec + // TODO(NODE-3144): needs to extend RetryableError or be marked retryable in some other way per spec /** The address of the connection pool */ address: string; constructor(pool: ConnectionPool) { - // TODO: pass in original pool-clearing error and use in message + // TODO(NODE-3135): pass in original pool-clearing error and use in message // "failed with: " super(`Connection pool for ${pool.address} was cleared because another operation failed`); this.address = pool.address; From 080a19a20e85f6fea49ba7d95d509f2059e624a7 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 26 Aug 2022 17:40:18 -0400 Subject: [PATCH 31/55] fix: make retryability async and handle PoolClearedError --- src/operations/execute_operation.ts | 34 +++++++++++-------- src/sdam/server.ts | 7 +++- ...rver_discovery_and_monitoring.spec.test.ts | 2 -- test/tools/unified-spec-runner/entities.ts | 1 + 4 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 54fc3b99d6..a3e9f3d139 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -1,3 +1,5 @@ +import { setTimeout } from 'timers'; + import type { Document } from '../bson'; import { isRetryableReadError, @@ -224,23 +226,25 @@ function executeWithServerSelection( } // select a new server, and attempt to retry the operation - topology.selectServer(selector, serverSelectionOptions, (error?: Error, server?: Server) => { - if (!error && isWriteOperation && !supportsRetryableWrites(server)) { - return callback( - new MongoUnexpectedServerResponseError( - 'Selected server does not support retryable writes' - ) - ); - } + setTimeout(() => { + topology.selectServer(selector, serverSelectionOptions, (error?: Error, server?: Server) => { + if (!error && isWriteOperation && !supportsRetryableWrites(server)) { + return callback( + new MongoUnexpectedServerResponseError( + 'Selected server does not support retryable writes' + ) + ); + } - if (error || !server) { - return callback( - error ?? new MongoUnexpectedServerResponseError('Server selection failed without error') - ); - } + if (error || !server) { + return callback( + error ?? new MongoUnexpectedServerResponseError('Server selection failed without error') + ); + } - operation.execute(server, session, callback); - }); + operation.execute(server, session, callback); + }); + }, 1); } if ( diff --git a/src/sdam/server.ts b/src/sdam/server.ts index ac18d46756..dd8767bc70 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -5,6 +5,7 @@ import { ConnectionPoolEvents, ConnectionPoolOptions } from '../cmap/connection_pool'; +import { PoolClearedError } from '../cmap/errors'; import { APM_EVENTS, CLOSED, @@ -358,7 +359,11 @@ export class Server extends TypedEventEmitter { (err, conn, cb) => { if (err || !conn) { this.s.operationCount -= 1; - markServerUnknown(this, err); + if (!(err instanceof PoolClearedError)) { + markServerUnknown(this, err); + } else { + err.addErrorLabel(MongoErrorLabel.RetryableWriteError); + } return cb(err); } diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts index bf80d89b15..be9e9c3cdb 100644 --- a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts @@ -19,8 +19,6 @@ const filter: TestFilter = ({ description }) => { return isAuthEnabled ? 'TODO(NODE-3135): handle auth errors, also see NODE-3891: fix tests broken when AUTH enabled' : false; - case 'PoolClearedError does not mark server unknown': - return 'TODO(NODE-3135): make CMAP SDAM-aware and ensure PoolClearError is retryable'; default: return false; } diff --git a/test/tools/unified-spec-runner/entities.ts b/test/tools/unified-spec-runner/entities.ts index 0ccced041e..95a93f2cbe 100644 --- a/test/tools/unified-spec-runner/entities.ts +++ b/test/tools/unified-spec-runner/entities.ts @@ -78,6 +78,7 @@ export class UnifiedThread { this.#killed = true; await this.#promise; if (this.#error) { + this.#error.message = `: ${this.#error.message}`; throw this.#error; } } From 4ffa108bf0b374864592cae294430712f1aec180 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Mon, 29 Aug 2022 15:41:32 -0400 Subject: [PATCH 32/55] refactor: check pool state and modify pending count in createConnection --- src/cmap/connection_pool.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 2214c05ce8..ccedd8dcbf 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -586,12 +586,14 @@ export class ConnectionPool extends TypedEventEmitter { connect(connectOptions, (err, connection) => { if (err || !connection) { this[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`); + this[kPending]--; callback(err); return; } // The pool might have closed since we started trying to create a connection - if (this.closed) { + // TODO: unit test + if (this[kPoolState] !== PoolState.ready) { this[kPending]--; connection.destroy({ force: true }); return; @@ -622,6 +624,7 @@ export class ConnectionPool extends TypedEventEmitter { connection.markAvailable(); this.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection)); + this[kPending]--; callback(undefined, connection); return; }); @@ -648,14 +651,12 @@ export class ConnectionPool extends TypedEventEmitter { // connection permits because that potentially delays the availability of // the connection to a checkout request this.createConnection((err, connection) => { - this[kPending]--; + // NOTE: createConnection guarantees that we cannot enter this block unless the pool is ready if (!err && connection) { this[kConnections].push(connection); process.nextTick(() => this.processWaitQueue()); } - if (this[kPoolState] === PoolState.ready) { - this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10); - } + this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10); }); } else { this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100); @@ -730,7 +731,6 @@ export class ConnectionPool extends TypedEventEmitter { continue; } this.createConnection((err, connection) => { - this[kPending]--; if (waitQueueMember[kCancelled]) { if (!err && connection) { this[kConnections].push(connection); From 37650842ba4206c19995494d3b8d331c0f3b7bd8 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Mon, 29 Aug 2022 17:52:20 -0400 Subject: [PATCH 33/55] refactor: synchronize pool clearing and mark unknown --- src/operations/execute_operation.ts | 32 ++++++++++++++--------------- src/sdam/server.ts | 7 +++++-- src/sdam/topology.ts | 7 ++++++- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index a3e9f3d139..687f7998eb 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -226,25 +226,23 @@ function executeWithServerSelection( } // select a new server, and attempt to retry the operation - setTimeout(() => { - topology.selectServer(selector, serverSelectionOptions, (error?: Error, server?: Server) => { - if (!error && isWriteOperation && !supportsRetryableWrites(server)) { - return callback( - new MongoUnexpectedServerResponseError( - 'Selected server does not support retryable writes' - ) - ); - } + topology.selectServer(selector, serverSelectionOptions, (error?: Error, server?: Server) => { + if (!error && isWriteOperation && !supportsRetryableWrites(server)) { + return callback( + new MongoUnexpectedServerResponseError( + 'Selected server does not support retryable writes' + ) + ); + } - if (error || !server) { - return callback( - error ?? new MongoUnexpectedServerResponseError('Server selection failed without error') - ); - } + if (error || !server) { + return callback( + error ?? new MongoUnexpectedServerResponseError('Server selection failed without error') + ); + } - operation.execute(server, session, callback); - }); - }, 1); + operation.execute(server, session, callback); + }); } if ( diff --git a/src/sdam/server.ts b/src/sdam/server.ts index dd8767bc70..0d4cd90be3 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -499,9 +499,10 @@ function makeOperationHandler( // In load balanced mode we never mark the server as unknown and always // clear for the specific service id. - server.s.pool.clear(connection.serviceId); if (!server.loadBalanced) { markServerUnknown(server, error); + } else { + server.s.pool.clear(connection.serviceId); } } } else { @@ -516,7 +517,9 @@ function makeOperationHandler( if (isSDAMUnrecoverableError(error)) { if (shouldHandleStateChangeError(server, error)) { if (maxWireVersion(server) <= 7 || isNodeShuttingDownError(error)) { - server.s.pool.clear(connection.serviceId); + if (server.loadBalanced) { + server.s.pool.clear(connection.serviceId); + } } if (!server.loadBalanced) { diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 97c56530dd..82c87eff32 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -813,12 +813,17 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) { const server = topology.s.servers.get(incomingServerDescription.address); if (server) { + server.s.description = incomingServerDescription; + if (incomingServerDescription.error) { + server.s.pool.clear(); + return; + } + const newTopologyType = topology.s.description.type; const shouldMarkPoolReady = incomingServerDescription.isDataBearing || (incomingServerDescription.type !== ServerType.Unknown && newTopologyType === TopologyType.Single); - server.s.description = incomingServerDescription; if (shouldMarkPoolReady) { server.s.pool.ready(); } From ded355f783732b4618c955fa0fed1d6a75e6d1fa Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Mon, 29 Aug 2022 18:14:03 -0400 Subject: [PATCH 34/55] fix: accidental early return --- src/sdam/topology.ts | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 82c87eff32..35ad3ee18b 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -816,16 +816,15 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes server.s.description = incomingServerDescription; if (incomingServerDescription.error) { server.s.pool.clear(); - return; - } - - const newTopologyType = topology.s.description.type; - const shouldMarkPoolReady = - incomingServerDescription.isDataBearing || - (incomingServerDescription.type !== ServerType.Unknown && - newTopologyType === TopologyType.Single); - if (shouldMarkPoolReady) { - server.s.pool.ready(); + } else { + const newTopologyType = topology.s.description.type; + const shouldMarkPoolReady = + incomingServerDescription.isDataBearing || + (incomingServerDescription.type !== ServerType.Unknown && + newTopologyType === TopologyType.Single); + if (shouldMarkPoolReady) { + server.s.pool.ready(); + } } } } From 6f4c0e95a4e07f1c87666c5bf5c3a59f7a511ea3 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Mon, 29 Aug 2022 17:55:04 -0400 Subject: [PATCH 35/55] test: skip afterEach if test is skipped --- .../server_discovery_and_monitoring.spec.test.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts index be9e9c3cdb..da27b3cc29 100644 --- a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts @@ -26,6 +26,9 @@ const filter: TestFilter = ({ description }) => { describe('SDAM Unified Tests', function () { afterEach(async function () { + if (this.currentTest!.pending) { + return; + } // TODO(NODE-4573): fix socket leaks const LEAKY_TESTS = [ 'Command error on Monitor handshake', From 146dafb1e273b3bef2b7ddf8d6781b1c2f4d3b1e Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Mon, 29 Aug 2022 18:10:58 -0400 Subject: [PATCH 36/55] refactor: remove redundant pool clear calls --- src/sdam/monitor.ts | 6 ------ src/sdam/server.ts | 4 ---- 2 files changed, 10 deletions(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index b54cc20dfb..62ea882a06 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -227,7 +227,6 @@ function checkServer(monitor: Monitor, callback: Callback) { ); monitor.emit('resetServer', err); - monitor.emit('resetConnectionPool'); callback(err); } @@ -306,11 +305,6 @@ function checkServer(monitor: Monitor, callback: Callback) { if (err) { monitor[kConnection] = undefined; - // we already reset the connection pool on network errors in all cases - if (!(err instanceof MongoNetworkError)) { - monitor.emit('resetConnectionPool'); - } - failureHandler(err); return; } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 0d4cd90be3..06cf8947c8 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -178,10 +178,6 @@ export class Server extends TypedEventEmitter { monitor.on(event, (e: any) => this.emit(event, e)); } - monitor.on('resetConnectionPool', () => { - this.s.pool.clear(); - }); - monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error)); monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => { this.emit( From 398136d98a7a59c6cf5387f5bebd10475e354763 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Mon, 29 Aug 2022 18:36:18 -0400 Subject: [PATCH 37/55] lint --- src/operations/execute_operation.ts | 2 -- src/sdam/monitor.ts | 1 - test/tools/runner/hooks/leak_checker.ts | 1 - 3 files changed, 4 deletions(-) diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 687f7998eb..54fc3b99d6 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -1,5 +1,3 @@ -import { setTimeout } from 'timers'; - import type { Document } from '../bson'; import { isRetryableReadError, diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 62ea882a06..25cec444fe 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -4,7 +4,6 @@ import { Document, Long } from '../bson'; import { connect } from '../cmap/connect'; import { Connection, ConnectionOptions } from '../cmap/connection'; import { LEGACY_HELLO_COMMAND } from '../constants'; -import { MongoNetworkError } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Callback, InterruptibleAsyncInterval } from '../utils'; import { diff --git a/test/tools/runner/hooks/leak_checker.ts b/test/tools/runner/hooks/leak_checker.ts index 62b8a459f8..07d97d239c 100644 --- a/test/tools/runner/hooks/leak_checker.ts +++ b/test/tools/runner/hooks/leak_checker.ts @@ -1,4 +1,3 @@ -/* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable @typescript-eslint/no-this-alias */ import { expect } from 'chai'; import * as chalk from 'chalk'; From 525c29fcb5ada6343b80fb4a239e7558cec81727 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Mon, 29 Aug 2022 18:59:05 -0400 Subject: [PATCH 38/55] Revert "refactor: check pool state and modify pending count in createConnection" This reverts commit e2940bf55c8e8b850e81ddb8ed07e10367028673. --- src/cmap/connection_pool.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index ccedd8dcbf..2214c05ce8 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -586,14 +586,12 @@ export class ConnectionPool extends TypedEventEmitter { connect(connectOptions, (err, connection) => { if (err || !connection) { this[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`); - this[kPending]--; callback(err); return; } // The pool might have closed since we started trying to create a connection - // TODO: unit test - if (this[kPoolState] !== PoolState.ready) { + if (this.closed) { this[kPending]--; connection.destroy({ force: true }); return; @@ -624,7 +622,6 @@ export class ConnectionPool extends TypedEventEmitter { connection.markAvailable(); this.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection)); - this[kPending]--; callback(undefined, connection); return; }); @@ -651,12 +648,14 @@ export class ConnectionPool extends TypedEventEmitter { // connection permits because that potentially delays the availability of // the connection to a checkout request this.createConnection((err, connection) => { - // NOTE: createConnection guarantees that we cannot enter this block unless the pool is ready + this[kPending]--; if (!err && connection) { this[kConnections].push(connection); process.nextTick(() => this.processWaitQueue()); } - this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10); + if (this[kPoolState] === PoolState.ready) { + this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10); + } }); } else { this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100); @@ -731,6 +730,7 @@ export class ConnectionPool extends TypedEventEmitter { continue; } this.createConnection((err, connection) => { + this[kPending]--; if (waitQueueMember[kCancelled]) { if (!err && connection) { this[kConnections].push(connection); From 88ef8a5b7c669ce5b84dd1db132e0eeab62e4e3c Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Mon, 29 Aug 2022 19:20:01 -0400 Subject: [PATCH 39/55] refactor: decrement pending connections only in createConnection --- src/cmap/connection_pool.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 2214c05ce8..4156c3fdb4 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -586,6 +586,7 @@ export class ConnectionPool extends TypedEventEmitter { connect(connectOptions, (err, connection) => { if (err || !connection) { this[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`); + this[kPending]--; callback(err); return; } @@ -622,6 +623,7 @@ export class ConnectionPool extends TypedEventEmitter { connection.markAvailable(); this.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection)); + this[kPending]--; callback(undefined, connection); return; }); @@ -648,7 +650,6 @@ export class ConnectionPool extends TypedEventEmitter { // connection permits because that potentially delays the availability of // the connection to a checkout request this.createConnection((err, connection) => { - this[kPending]--; if (!err && connection) { this[kConnections].push(connection); process.nextTick(() => this.processWaitQueue()); @@ -656,6 +657,7 @@ export class ConnectionPool extends TypedEventEmitter { if (this[kPoolState] === PoolState.ready) { this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10); } + // TODO: destroy connection if pool paused? => this.connectionIsPerished(...) }); } else { this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100); @@ -730,7 +732,6 @@ export class ConnectionPool extends TypedEventEmitter { continue; } this.createConnection((err, connection) => { - this[kPending]--; if (waitQueueMember[kCancelled]) { if (!err && connection) { this[kConnections].push(connection); From 0206e842eb3b0953ec941cbca8b9326fa13446e7 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 31 Aug 2022 14:51:15 -0400 Subject: [PATCH 40/55] refactor: keep waitqueue members in queue until connection ready --- src/cmap/connection_pool.ts | 39 +++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 4156c3fdb4..22e916ede0 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -66,6 +66,8 @@ const kMetrics = Symbol('metrics'); const kProcessingWaitQueue = Symbol('processingWaitQueue'); /** @internal */ const kPoolState = Symbol('poolState'); +/** @internal */ +const kWaitQueuePending = Symbol('waitQueuePending'); /** @public */ export interface ConnectionPoolOptions extends Omit { @@ -154,6 +156,8 @@ export class ConnectionPool extends TypedEventEmitter { [kMetrics]: ConnectionPoolMetrics; /** @internal */ [kProcessingWaitQueue]: boolean; + /** @internal */ + [kWaitQueuePending]: number; /** * Emitted when the connection pool is created. @@ -245,6 +249,7 @@ export class ConnectionPool extends TypedEventEmitter { this[kCancellationToken] = new CancellationToken(); this[kCancellationToken].setMaxListeners(Infinity); this[kWaitQueue] = new Denque(); + this[kWaitQueuePending] = 0; this[kMetrics] = new ConnectionPoolMetrics(); this[kProcessingWaitQueue] = false; @@ -396,6 +401,7 @@ export class ConnectionPool extends TypedEventEmitter { if (this.closed) { return; } + this[kWaitQueuePending] = 0; // handle load balanced case if (this.loadBalanced && serviceId) { @@ -453,6 +459,7 @@ export class ConnectionPool extends TypedEventEmitter { } this[kPoolState] = PoolState.closed; + this[kWaitQueuePending] = 0; this.clearMinPoolSizeTimer(); this.processWaitQueue(); @@ -591,8 +598,8 @@ export class ConnectionPool extends TypedEventEmitter { return; } - // The pool might have closed since we started trying to create a connection - if (this.closed) { + // The pool might have been cleared or closed since we started trying to create a connection + if (this[kPoolState] !== PoolState.ready) { this[kPending]--; connection.destroy({ force: true }); return; @@ -722,17 +729,29 @@ export class ConnectionPool extends TypedEventEmitter { } const { maxPoolSize, maxConnecting } = this.options; - while ( - this.waitQueueSize > 0 && - this.pendingConnectionCount < maxConnecting && - (maxPoolSize === 0 || this.totalConnectionCount < maxPoolSize) + for ( + let waitQueueIndex = this[kWaitQueuePending]; + waitQueueIndex < this.waitQueueSize; + waitQueueIndex++ ) { - const waitQueueMember = this[kWaitQueue].shift(); - if (!waitQueueMember || waitQueueMember[kCancelled]) { - continue; + if ( + this.pendingConnectionCount >= maxConnecting || + (maxPoolSize > 0 && this.totalConnectionCount >= maxPoolSize) + ) { + break; } + this[kWaitQueuePending]++; this.createConnection((err, connection) => { - if (waitQueueMember[kCancelled]) { + // The > 0 guard is just a precaution against future refactors + // Currently, the callback is invoked sync from createConnection + // so we are guaranteed the pool is still ready at this point + // however, if this changes to async, then it will be possible for the + // queue to get cleared before we get here + if (this[kWaitQueuePending] > 0) { + this[kWaitQueuePending]--; + } + const waitQueueMember = this[kWaitQueue].shift(); + if (!waitQueueMember || waitQueueMember[kCancelled]) { if (!err && connection) { this[kConnections].push(connection); } From 4e0ada23a958520103fd5abf3fe66e0027a56dfd Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 31 Aug 2022 17:25:04 -0400 Subject: [PATCH 41/55] fix: maybe fix sdam to only clear pool when appropriate --- src/error.ts | 3 ++- src/sdam/monitor.ts | 11 ++++++++++- src/sdam/server.ts | 10 ++++++---- src/sdam/topology.ts | 3 ++- 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/error.ts b/src/error.ts index 7a788b923b..66a34450fd 100644 --- a/src/error.ts +++ b/src/error.ts @@ -89,7 +89,8 @@ export const MongoErrorLabel = Object.freeze({ TransientTransactionError: 'TransientTransactionError', UnknownTransactionCommitResult: 'UnknownTransactionCommitResult', ResumableChangeStreamError: 'ResumableChangeStreamError', - HandshakeError: 'HandshakeError' + HandshakeError: 'HandshakeError', + ResetPool: 'ResetPool' } as const); /** @public */ diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 25cec444fe..d9d2c3e76b 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -4,6 +4,7 @@ import { Document, Long } from '../bson'; import { connect } from '../cmap/connect'; import { Connection, ConnectionOptions } from '../cmap/connection'; import { LEGACY_HELLO_COMMAND } from '../constants'; +import { MongoError, MongoErrorLabel } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Callback, InterruptibleAsyncInterval } from '../utils'; import { @@ -225,7 +226,15 @@ function checkServer(monitor: Monitor, callback: Callback) { new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err) ); - monitor.emit('resetServer', err); + let error: MongoError; + if (!(err instanceof MongoError)) { + error = new MongoError(err); + } else { + error = err; + } + error.addErrorLabel(MongoErrorLabel.ResetPool); + + monitor.emit('resetServer', error); callback(err); } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 06cf8947c8..1b5a0de9c5 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -512,13 +512,15 @@ function makeOperationHandler( if (isSDAMUnrecoverableError(error)) { if (shouldHandleStateChangeError(server, error)) { - if (maxWireVersion(server) <= 7 || isNodeShuttingDownError(error)) { - if (server.loadBalanced) { - server.s.pool.clear(connection.serviceId); - } + const shouldClearPool = maxWireVersion(server) <= 7 || isNodeShuttingDownError(error); + if (server.loadBalanced && shouldClearPool) { + server.s.pool.clear(connection.serviceId); } if (!server.loadBalanced) { + if (shouldClearPool) { + error.addErrorLabel(MongoErrorLabel.ResetPool); + } markServerUnknown(server, error); process.nextTick(() => server.requestCheck()); } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 35ad3ee18b..ed23caa7d8 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -26,6 +26,7 @@ import { import { MongoCompatibilityError, MongoDriverError, + MongoErrorLabel, MongoRuntimeError, MongoServerSelectionError, MongoTopologyClosedError @@ -814,7 +815,7 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes const server = topology.s.servers.get(incomingServerDescription.address); if (server) { server.s.description = incomingServerDescription; - if (incomingServerDescription.error) { + if (incomingServerDescription.error?.hasErrorLabel(MongoErrorLabel.ResetPool)) { server.s.pool.clear(); } else { const newTopologyType = topology.s.description.type; From d91423497559176770a6234a7405ddb4bc3befc2 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 31 Aug 2022 17:31:55 -0400 Subject: [PATCH 42/55] fix: prevent parallel ensureminpoolsize timers --- src/cmap/connection_pool.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 22e916ede0..45b0c13476 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -325,6 +325,7 @@ export class ConnectionPool extends TypedEventEmitter { } this[kPoolState] = PoolState.ready; this.emit(ConnectionPool.CONNECTION_POOL_READY, new ConnectionPoolReadyEvent(this)); + clearTimeout(this[kMinPoolSizeTimer]); this.ensureMinPoolSize(); } @@ -594,7 +595,7 @@ export class ConnectionPool extends TypedEventEmitter { if (err || !connection) { this[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`); this[kPending]--; - callback(err); + callback(err ?? new MongoRuntimeError('Connection creation failed without error')); return; } @@ -662,11 +663,12 @@ export class ConnectionPool extends TypedEventEmitter { process.nextTick(() => this.processWaitQueue()); } if (this[kPoolState] === PoolState.ready) { + clearTimeout(this[kMinPoolSizeTimer]); this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10); } - // TODO: destroy connection if pool paused? => this.connectionIsPerished(...) }); } else { + clearTimeout(this[kMinPoolSizeTimer]); this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100); } } From 223cf1107e6627b6de13f0e6f088d501b29c8f6a Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 31 Aug 2022 17:37:17 -0400 Subject: [PATCH 43/55] lint: unremove eslint disable rule --- test/tools/runner/hooks/leak_checker.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/test/tools/runner/hooks/leak_checker.ts b/test/tools/runner/hooks/leak_checker.ts index 07d97d239c..62b8a459f8 100644 --- a/test/tools/runner/hooks/leak_checker.ts +++ b/test/tools/runner/hooks/leak_checker.ts @@ -1,3 +1,4 @@ +/* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable @typescript-eslint/no-this-alias */ import { expect } from 'chai'; import * as chalk from 'chalk'; From ea9e742791ea8189e7b35ec0e8ea9bd9199b71be Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 31 Aug 2022 17:48:10 -0400 Subject: [PATCH 44/55] fix: missed a spot --- src/sdam/server.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 1b5a0de9c5..3560ef9cfe 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -496,6 +496,7 @@ function makeOperationHandler( // clear for the specific service id. if (!server.loadBalanced) { + error.addErrorLabel(MongoErrorLabel.ResetPool); markServerUnknown(server, error); } else { server.s.pool.clear(connection.serviceId); From 92a7c542aa6d4a63e4f1b7d8612080333fe8a258 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 31 Aug 2022 18:39:07 -0400 Subject: [PATCH 45/55] fix: add extra guard for error type --- src/sdam/server_description.ts | 1 + src/sdam/topology.ts | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index 1c3feeb629..9d548d266b 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -53,6 +53,7 @@ export class ServerDescription { passives: string[]; arbiters: string[]; tags: TagSet; + // TODO: this error type is not actually guaranteed error: MongoServerError | null; topologyVersion: TopologyVersion | null; minWireVersion: number; diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index ed23caa7d8..7f86eb73de 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -26,6 +26,7 @@ import { import { MongoCompatibilityError, MongoDriverError, + MongoError, MongoErrorLabel, MongoRuntimeError, MongoServerSelectionError, @@ -815,7 +816,10 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes const server = topology.s.servers.get(incomingServerDescription.address); if (server) { server.s.description = incomingServerDescription; - if (incomingServerDescription.error?.hasErrorLabel(MongoErrorLabel.ResetPool)) { + if ( + incomingServerDescription.error instanceof MongoError && + incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool) + ) { server.s.pool.clear(); } else { const newTopologyType = topology.s.description.type; From ffcdbb4ff7c9e0905504d42933399983b033a7ef Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 31 Aug 2022 18:50:05 -0400 Subject: [PATCH 46/55] Revert "refactor: keep waitqueue members in queue until connection ready" This reverts commit 0206e842eb3b0953ec941cbca8b9326fa13446e7. --- src/cmap/connection_pool.ts | 39 ++++++++++--------------------------- 1 file changed, 10 insertions(+), 29 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 45b0c13476..53a9356d91 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -66,8 +66,6 @@ const kMetrics = Symbol('metrics'); const kProcessingWaitQueue = Symbol('processingWaitQueue'); /** @internal */ const kPoolState = Symbol('poolState'); -/** @internal */ -const kWaitQueuePending = Symbol('waitQueuePending'); /** @public */ export interface ConnectionPoolOptions extends Omit { @@ -156,8 +154,6 @@ export class ConnectionPool extends TypedEventEmitter { [kMetrics]: ConnectionPoolMetrics; /** @internal */ [kProcessingWaitQueue]: boolean; - /** @internal */ - [kWaitQueuePending]: number; /** * Emitted when the connection pool is created. @@ -249,7 +245,6 @@ export class ConnectionPool extends TypedEventEmitter { this[kCancellationToken] = new CancellationToken(); this[kCancellationToken].setMaxListeners(Infinity); this[kWaitQueue] = new Denque(); - this[kWaitQueuePending] = 0; this[kMetrics] = new ConnectionPoolMetrics(); this[kProcessingWaitQueue] = false; @@ -402,7 +397,6 @@ export class ConnectionPool extends TypedEventEmitter { if (this.closed) { return; } - this[kWaitQueuePending] = 0; // handle load balanced case if (this.loadBalanced && serviceId) { @@ -460,7 +454,6 @@ export class ConnectionPool extends TypedEventEmitter { } this[kPoolState] = PoolState.closed; - this[kWaitQueuePending] = 0; this.clearMinPoolSizeTimer(); this.processWaitQueue(); @@ -599,8 +592,8 @@ export class ConnectionPool extends TypedEventEmitter { return; } - // The pool might have been cleared or closed since we started trying to create a connection - if (this[kPoolState] !== PoolState.ready) { + // The pool might have closed since we started trying to create a connection + if (this.closed) { this[kPending]--; connection.destroy({ force: true }); return; @@ -731,29 +724,17 @@ export class ConnectionPool extends TypedEventEmitter { } const { maxPoolSize, maxConnecting } = this.options; - for ( - let waitQueueIndex = this[kWaitQueuePending]; - waitQueueIndex < this.waitQueueSize; - waitQueueIndex++ + while ( + this.waitQueueSize > 0 && + this.pendingConnectionCount < maxConnecting && + (maxPoolSize === 0 || this.totalConnectionCount < maxPoolSize) ) { - if ( - this.pendingConnectionCount >= maxConnecting || - (maxPoolSize > 0 && this.totalConnectionCount >= maxPoolSize) - ) { - break; + const waitQueueMember = this[kWaitQueue].shift(); + if (!waitQueueMember || waitQueueMember[kCancelled]) { + continue; } - this[kWaitQueuePending]++; this.createConnection((err, connection) => { - // The > 0 guard is just a precaution against future refactors - // Currently, the callback is invoked sync from createConnection - // so we are guaranteed the pool is still ready at this point - // however, if this changes to async, then it will be possible for the - // queue to get cleared before we get here - if (this[kWaitQueuePending] > 0) { - this[kWaitQueuePending]--; - } - const waitQueueMember = this[kWaitQueue].shift(); - if (!waitQueueMember || waitQueueMember[kCancelled]) { + if (waitQueueMember[kCancelled]) { if (!err && connection) { this[kConnections].push(connection); } From a09ad2357089ef31c5022114c703d974eff22a77 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Wed, 31 Aug 2022 21:58:29 -0400 Subject: [PATCH 47/55] fix: do not leave hanging callbacks in createConnection --- src/cmap/connection_pool.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 53a9356d91..648f5a7c76 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -593,9 +593,10 @@ export class ConnectionPool extends TypedEventEmitter { } // The pool might have closed since we started trying to create a connection - if (this.closed) { + if (this[kPoolState] !== PoolState.ready) { this[kPending]--; connection.destroy({ force: true }); + callback(this.closed ? new PoolClosedError(this) : new PoolClearedError(this)); return; } @@ -742,7 +743,8 @@ export class ConnectionPool extends TypedEventEmitter { if (err) { this.emit( ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - new ConnectionCheckOutFailedEvent(this, err) + // TODO: this should be connectionError in the reason, not err + new ConnectionCheckOutFailedEvent(this, 'connectionError') ); } else if (connection) { this[kCheckedOut]++; From 6d5edd95b701e746bc9fe27ff039176e8389c81e Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Thu, 1 Sep 2022 17:33:49 -0400 Subject: [PATCH 48/55] fix: remove duplicate resetServer --- src/sdam/monitor.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index d9d2c3e76b..f3d9d2295e 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -353,7 +353,6 @@ function monitorServer(monitor: Monitor) { if (err) { // otherwise an error occurred on initial discovery, also bail if (monitor[kServer].description.type === ServerType.Unknown) { - monitor.emit('resetServer', err); return done(); } } From 24a18602c084c73ca5a96fb0ad06804a96ce0d83 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Thu, 1 Sep 2022 18:17:09 -0400 Subject: [PATCH 49/55] test: skip flaky tests --- .../server_discovery_and_monitoring.spec.test.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts index da27b3cc29..3a95f9493b 100644 --- a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts @@ -19,6 +19,9 @@ const filter: TestFilter = ({ description }) => { return isAuthEnabled ? 'TODO(NODE-3135): handle auth errors, also see NODE-3891: fix tests broken when AUTH enabled' : false; + case 'Network error on Monitor check': + case 'Network timeout on Monitor check': + return 'TODO(NODE-4608): Disallow parallel monitor checks'; default: return false; } From e91e0bba6e96ad34bd9f1d04e47bebed4cd9a202 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 2 Sep 2022 16:59:18 -0400 Subject: [PATCH 50/55] lint: remove stray comment --- src/cmap/connection_pool.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 648f5a7c76..723b4c8f67 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -743,7 +743,6 @@ export class ConnectionPool extends TypedEventEmitter { if (err) { this.emit( ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - // TODO: this should be connectionError in the reason, not err new ConnectionCheckOutFailedEvent(this, 'connectionError') ); } else if (connection) { From c33d7310358b7c3cfa20a7f6ceb254cab3f99fdb Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 2 Sep 2022 17:03:10 -0400 Subject: [PATCH 51/55] fix: type cleanup --- src/sdam/monitor.ts | 9 ++------- src/sdam/server_description.ts | 5 ++--- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index f3d9d2295e..47268a4a41 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -69,7 +69,7 @@ export type MonitorEvents = { serverHeartbeatStarted(event: ServerHeartbeatStartedEvent): void; serverHeartbeatSucceeded(event: ServerHeartbeatSucceededEvent): void; serverHeartbeatFailed(event: ServerHeartbeatFailedEvent): void; - resetServer(error?: Error): void; + resetServer(error?: MongoError): void; resetConnectionPool(): void; close(): void; } & EventEmitterWithState; @@ -226,12 +226,7 @@ function checkServer(monitor: Monitor, callback: Callback) { new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err) ); - let error: MongoError; - if (!(err instanceof MongoError)) { - error = new MongoError(err); - } else { - error = err; - } + const error = !(err instanceof MongoError) ? new MongoError(err) : err; error.addErrorLabel(MongoErrorLabel.ResetPool); monitor.emit('resetServer', error); diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index 9d548d266b..541752cb43 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -1,5 +1,5 @@ import { Document, Long, ObjectId } from '../bson'; -import { MongoRuntimeError, MongoServerError } from '../error'; +import { MongoError, MongoRuntimeError, MongoServerError } from '../error'; import { arrayStrictEqual, compareObjectId, errorStrictEqual, HostAddress, now } from '../utils'; import type { ClusterTime } from './common'; import { ServerType } from './common'; @@ -53,8 +53,7 @@ export class ServerDescription { passives: string[]; arbiters: string[]; tags: TagSet; - // TODO: this error type is not actually guaranteed - error: MongoServerError | null; + error: MongoError | null; topologyVersion: TopologyVersion | null; minWireVersion: number; maxWireVersion: number; From 61aa0b3a5ca366bef832f16632f5adb7273e2750 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 2 Sep 2022 17:05:32 -0400 Subject: [PATCH 52/55] refactor: extra guard for pool ready --- src/sdam/topology.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 7f86eb73de..8c33535014 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -821,7 +821,7 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool) ) { server.s.pool.clear(); - } else { + } else if (incomingServerDescription.error == null) { const newTopologyType = topology.s.description.type; const shouldMarkPoolReady = incomingServerDescription.isDataBearing || From a69c7966354be1d87707fac8d3284ac4a7523b1e Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 2 Sep 2022 17:14:46 -0400 Subject: [PATCH 53/55] feat: preserve original error on cause property in MongoError --- src/error.ts | 2 ++ test/unit/error.test.ts | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/error.ts b/src/error.ts index 66a34450fd..757c4505f0 100644 --- a/src/error.ts +++ b/src/error.ts @@ -122,10 +122,12 @@ export class MongoError extends Error { */ code?: number | string; topologyVersion?: TopologyVersion; + cause?: Error; constructor(message: string | Error) { if (message instanceof Error) { super(message.message); + this.cause = message; } else { super(message); } diff --git a/test/unit/error.test.ts b/test/unit/error.test.ts index 8f5d10fb14..3ae43e35ea 100644 --- a/test/unit/error.test.ts +++ b/test/unit/error.test.ts @@ -90,10 +90,12 @@ describe('MongoErrors', () => { it('should accept an Error object', () => { const errorMessage = 'A test error'; - const err = new MongoError(new Error(errorMessage)); + const inputError = new Error(errorMessage); + const err = new MongoError(inputError); expect(err).to.be.an.instanceof(Error); expect(err.name).to.equal('MongoError'); expect(err.message).to.equal(errorMessage); + expect(err).to.have.property('cause', inputError); }); }); From 10c384b9ce838e952c2f2fe41270db7ae9e84621 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 2 Sep 2022 17:17:23 -0400 Subject: [PATCH 54/55] lint: test type fix --- .../server_discovery_and_monitoring.prose.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts index 8da3bd765a..8d41348dbf 100644 --- a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts @@ -32,7 +32,7 @@ describe('Server Discovery and Monitoring Prose Tests', function () { */ let client: MongoClient; - const events = []; + const events: string[] = []; beforeEach(async function () { client = this.configuration.newClient({ directConnection: true, From fe381dc9288db142e86106a9f3385d55779f2846 Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Fri, 2 Sep 2022 17:26:42 -0400 Subject: [PATCH 55/55] ts: ignore node version dependent error --- src/error.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/error.ts b/src/error.ts index 757c4505f0..2a387a0d47 100644 --- a/src/error.ts +++ b/src/error.ts @@ -122,7 +122,9 @@ export class MongoError extends Error { */ code?: number | string; topologyVersion?: TopologyVersion; - cause?: Error; + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + cause?: Error; // depending on the node version, this may or may not exist on the base constructor(message: string | Error) { if (message instanceof Error) {