diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 9a13b7d486..723b4c8f67 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -13,6 +13,7 @@ import { CONNECTION_POOL_CLEARED, CONNECTION_POOL_CLOSED, CONNECTION_POOL_CREATED, + CONNECTION_POOL_READY, CONNECTION_READY } from '../constants'; import { MongoError, MongoInvalidArgumentError, MongoRuntimeError } from '../error'; @@ -31,9 +32,10 @@ import { ConnectionPoolClearedEvent, ConnectionPoolClosedEvent, ConnectionPoolCreatedEvent, + ConnectionPoolReadyEvent, ConnectionReadyEvent } from './connection_pool_events'; -import { PoolClosedError, WaitQueueTimeoutError } from './errors'; +import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors'; import { ConnectionPoolMetrics } from './metrics'; /** @internal */ @@ -103,6 +105,7 @@ export interface CloseOptions { /** @public */ export type ConnectionPoolEvents = { connectionPoolCreated(event: ConnectionPoolCreatedEvent): void; + connectionPoolReady(event: ConnectionPoolReadyEvent): void; connectionPoolClosed(event: ConnectionPoolClosedEvent): void; connectionPoolCleared(event: ConnectionPoolClearedEvent): void; connectionCreated(event: ConnectionCreatedEvent): void; @@ -167,6 +170,11 @@ export class ConnectionPool extends TypedEventEmitter { * @event */ static readonly CONNECTION_POOL_CLEARED = CONNECTION_POOL_CLEARED; + /** + * Emitted each time the connection pool is marked ready + * @event + */ + static readonly CONNECTION_POOL_READY = CONNECTION_POOL_READY; /** * Emitted when a connection is created. * @event @@ -242,7 +250,6 @@ export class ConnectionPool extends TypedEventEmitter { process.nextTick(() => { this.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this)); - this.ensureMinPoolSize(); }); } @@ -308,7 +315,13 @@ export class ConnectionPool extends TypedEventEmitter { * Set the pool state to "ready" */ ready(): void { + if (this[kPoolState] !== PoolState.paused) { + return; + } this[kPoolState] = PoolState.ready; + this.emit(ConnectionPool.CONNECTION_POOL_READY, new ConnectionPoolReadyEvent(this)); + clearTimeout(this[kMinPoolSizeTimer]); + this.ensureMinPoolSize(); } /** @@ -322,15 +335,6 @@ export class ConnectionPool extends TypedEventEmitter { new ConnectionCheckOutStartedEvent(this) ); - if (this.closed) { - this.emit( - ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - new ConnectionCheckOutFailedEvent(this, 'poolClosed') - ); - callback(new PoolClosedError(this)); - return; - } - const waitQueueMember: WaitQueueMember = { callback }; const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; if (waitQueueTimeoutMS) { @@ -390,26 +394,40 @@ export class ConnectionPool extends TypedEventEmitter { * previous generation will eventually be pruned during subsequent checkouts. */ clear(serviceId?: ObjectId): void { + if (this.closed) { + return; + } + + // handle load balanced case if (this.loadBalanced && serviceId) { const sid = serviceId.toHexString(); const generation = this.serviceGenerations.get(sid); // Only need to worry if the generation exists, since it should // always be there but typescript needs the check. if (generation == null) { - // TODO(NODE-3483) throw new MongoRuntimeError('Service generations are required in load balancer mode.'); } else { // Increment the generation for the service id. this.serviceGenerations.set(sid, generation + 1); } - } else { - this[kGeneration] += 1; + this.emit( + ConnectionPool.CONNECTION_POOL_CLEARED, + new ConnectionPoolClearedEvent(this, serviceId) + ); + return; } - this.emit( - ConnectionPool.CONNECTION_POOL_CLEARED, - new ConnectionPoolClearedEvent(this, serviceId) - ); + // handle non load-balanced case + this[kGeneration] += 1; + const alreadyPaused = this[kPoolState] === PoolState.paused; + this[kPoolState] = PoolState.paused; + + this.clearMinPoolSizeTimer(); + this.processWaitQueue(); + + if (!alreadyPaused) { + this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this)); + } } /** Close the pool */ @@ -430,33 +448,15 @@ export class ConnectionPool extends TypedEventEmitter { // immediately cancel any in-flight connections this[kCancellationToken].emit('cancel'); - // drain the wait queue - while (this.waitQueueSize) { - const waitQueueMember = this[kWaitQueue].pop(); - if (waitQueueMember) { - if (waitQueueMember.timer) { - clearTimeout(waitQueueMember.timer); - } - if (!waitQueueMember[kCancelled]) { - // TODO(NODE-3483): Replace with MongoConnectionPoolClosedError - waitQueueMember.callback(new MongoRuntimeError('Connection pool closed')); - } - } - } - - // clear the min pool size timer - const minPoolSizeTimer = this[kMinPoolSizeTimer]; - if (minPoolSizeTimer) { - clearTimeout(minPoolSizeTimer); - } - // end the connection counter if (typeof this[kConnectionCounter].return === 'function') { this[kConnectionCounter].return(undefined); } - // mark the pool as closed immediately this[kPoolState] = PoolState.closed; + this.clearMinPoolSizeTimer(); + this.processWaitQueue(); + eachAsync( this[kConnections].toArray(), (conn, cb) => { @@ -526,12 +526,19 @@ export class ConnectionPool extends TypedEventEmitter { }); } + /** Clear the min pool size timer */ + private clearMinPoolSizeTimer(): void { + const minPoolSizeTimer = this[kMinPoolSizeTimer]; + if (minPoolSizeTimer) { + clearTimeout(minPoolSizeTimer); + } + } + private destroyConnection(connection: Connection, reason: string) { this.emit( ConnectionPool.CONNECTION_CLOSED, new ConnectionClosedEvent(this, connection, reason) ); - // destroy the connection process.nextTick(() => connection.destroy()); } @@ -580,14 +587,16 @@ export class ConnectionPool extends TypedEventEmitter { connect(connectOptions, (err, connection) => { if (err || !connection) { this[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`); - callback(err); + this[kPending]--; + callback(err ?? new MongoRuntimeError('Connection creation failed without error')); return; } // The pool might have closed since we started trying to create a connection - if (this.closed) { + if (this[kPoolState] !== PoolState.ready) { this[kPending]--; connection.destroy({ force: true }); + callback(this.closed ? new PoolClosedError(this) : new PoolClearedError(this)); return; } @@ -616,6 +625,7 @@ export class ConnectionPool extends TypedEventEmitter { connection.markAvailable(); this.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection)); + this[kPending]--; callback(undefined, connection); return; }); @@ -623,10 +633,17 @@ export class ConnectionPool extends TypedEventEmitter { private ensureMinPoolSize() { const minPoolSize = this.options.minPoolSize; - if (this.closed || minPoolSize === 0) { + if (this[kPoolState] !== PoolState.ready || minPoolSize === 0) { return; } + for (let i = 0; i < this[kConnections].length; i++) { + const connection = this[kConnections].peekAt(i); + if (connection && this.connectionIsPerished(connection)) { + this[kConnections].removeOne(i); + } + } + if ( this.totalConnectionCount < minPoolSize && this.pendingConnectionCount < this.options.maxConnecting @@ -635,23 +652,25 @@ export class ConnectionPool extends TypedEventEmitter { // connection permits because that potentially delays the availability of // the connection to a checkout request this.createConnection((err, connection) => { - this[kPending]--; if (!err && connection) { this[kConnections].push(connection); process.nextTick(() => this.processWaitQueue()); } - this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10); + if (this[kPoolState] === PoolState.ready) { + clearTimeout(this[kMinPoolSizeTimer]); + this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10); + } }); } else { + clearTimeout(this[kMinPoolSizeTimer]); this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100); } } private processWaitQueue() { - if (this.closed || this[kProcessingWaitQueue]) { + if (this[kProcessingWaitQueue]) { return; } - this[kProcessingWaitQueue] = true; while (this.waitQueueSize) { @@ -666,6 +685,21 @@ export class ConnectionPool extends TypedEventEmitter { continue; } + if (this[kPoolState] !== PoolState.ready) { + const reason = this.closed ? 'poolClosed' : 'connectionError'; + const error = this.closed ? new PoolClosedError(this) : new PoolClearedError(this); + this.emit( + ConnectionPool.CONNECTION_CHECK_OUT_FAILED, + new ConnectionCheckOutFailedEvent(this, reason) + ); + if (waitQueueMember.timer) { + clearTimeout(waitQueueMember.timer); + } + this[kWaitQueue].shift(); + waitQueueMember.callback(error); + continue; + } + if (!this.availableConnectionCount) { break; } @@ -701,7 +735,6 @@ export class ConnectionPool extends TypedEventEmitter { continue; } this.createConnection((err, connection) => { - this[kPending]--; if (waitQueueMember[kCancelled]) { if (!err && connection) { this[kConnections].push(connection); @@ -710,7 +743,7 @@ export class ConnectionPool extends TypedEventEmitter { if (err) { this.emit( ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - new ConnectionCheckOutFailedEvent(this, err) + new ConnectionCheckOutFailedEvent(this, 'connectionError') ); } else if (connection) { this[kCheckedOut]++; diff --git a/src/cmap/connection_pool_events.ts b/src/cmap/connection_pool_events.ts index a98dc25d78..90c4826d58 100644 --- a/src/cmap/connection_pool_events.ts +++ b/src/cmap/connection_pool_events.ts @@ -37,6 +37,18 @@ export class ConnectionPoolCreatedEvent extends ConnectionPoolMonitoringEvent { } } +/** + * An event published when a connection pool is ready + * @public + * @category Event + */ +export class ConnectionPoolReadyEvent extends ConnectionPoolMonitoringEvent { + /** @internal */ + constructor(pool: ConnectionPool) { + super(pool); + } +} + /** * An event published when a connection pool is closed * @public diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index 85e8dac798..c8355ec562 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -1,4 +1,4 @@ -import { MongoDriverError } from '../error'; +import { MongoDriverError, MongoNetworkError } from '../error'; import type { ConnectionPool } from './connection_pool'; /** @@ -19,6 +19,27 @@ export class PoolClosedError extends MongoDriverError { } } +/** + * An error indicating a connection pool is currently paused + * @category Error + */ +export class PoolClearedError extends MongoNetworkError { + // TODO(NODE-3144): needs to extend RetryableError or be marked retryable in some other way per spec + /** The address of the connection pool */ + address: string; + + constructor(pool: ConnectionPool) { + // TODO(NODE-3135): pass in original pool-clearing error and use in message + // "failed with: " + super(`Connection pool for ${pool.address} was cleared because another operation failed`); + this.address = pool.address; + } + + override get name(): string { + return 'MongoPoolClearedError'; + } +} + /** * An error thrown when a request to check out a connection times out * @category Error diff --git a/src/constants.ts b/src/constants.ts index 2326e8d2c1..eec4b07571 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -26,6 +26,7 @@ export const TOPOLOGY_DESCRIPTION_CHANGED = 'topologyDescriptionChanged' as cons export const CONNECTION_POOL_CREATED = 'connectionPoolCreated' as const; export const CONNECTION_POOL_CLOSED = 'connectionPoolClosed' as const; export const CONNECTION_POOL_CLEARED = 'connectionPoolCleared' as const; +export const CONNECTION_POOL_READY = 'connectionPoolReady' as const; export const CONNECTION_CREATED = 'connectionCreated' as const; export const CONNECTION_READY = 'connectionReady' as const; export const CONNECTION_CLOSED = 'connectionClosed' as const; @@ -57,6 +58,8 @@ export const HEARTBEAT_EVENTS = Object.freeze([ /** @public */ export const CMAP_EVENTS = Object.freeze([ CONNECTION_POOL_CREATED, + CONNECTION_POOL_READY, + CONNECTION_POOL_CLEARED, CONNECTION_POOL_CLOSED, CONNECTION_CREATED, CONNECTION_READY, @@ -64,8 +67,7 @@ export const CMAP_EVENTS = Object.freeze([ CONNECTION_CHECK_OUT_STARTED, CONNECTION_CHECK_OUT_FAILED, CONNECTION_CHECKED_OUT, - CONNECTION_CHECKED_IN, - CONNECTION_POOL_CLEARED + CONNECTION_CHECKED_IN ] as const); /** @public */ diff --git a/src/error.ts b/src/error.ts index 7a788b923b..2a387a0d47 100644 --- a/src/error.ts +++ b/src/error.ts @@ -89,7 +89,8 @@ export const MongoErrorLabel = Object.freeze({ TransientTransactionError: 'TransientTransactionError', UnknownTransactionCommitResult: 'UnknownTransactionCommitResult', ResumableChangeStreamError: 'ResumableChangeStreamError', - HandshakeError: 'HandshakeError' + HandshakeError: 'HandshakeError', + ResetPool: 'ResetPool' } as const); /** @public */ @@ -121,10 +122,14 @@ export class MongoError extends Error { */ code?: number | string; topologyVersion?: TopologyVersion; + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + cause?: Error; // depending on the node version, this may or may not exist on the base constructor(message: string | Error) { if (message instanceof Error) { super(message.message); + this.cause = message; } else { super(message); } diff --git a/src/index.ts b/src/index.ts index 933be8772e..865947fdd3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -132,6 +132,7 @@ export { ConnectionPoolClosedEvent, ConnectionPoolCreatedEvent, ConnectionPoolMonitoringEvent, + ConnectionPoolReadyEvent, ConnectionReadyEvent } from './cmap/connection_pool_events'; export { diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index b54cc20dfb..47268a4a41 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -4,7 +4,7 @@ import { Document, Long } from '../bson'; import { connect } from '../cmap/connect'; import { Connection, ConnectionOptions } from '../cmap/connection'; import { LEGACY_HELLO_COMMAND } from '../constants'; -import { MongoNetworkError } from '../error'; +import { MongoError, MongoErrorLabel } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Callback, InterruptibleAsyncInterval } from '../utils'; import { @@ -69,7 +69,7 @@ export type MonitorEvents = { serverHeartbeatStarted(event: ServerHeartbeatStartedEvent): void; serverHeartbeatSucceeded(event: ServerHeartbeatSucceededEvent): void; serverHeartbeatFailed(event: ServerHeartbeatFailedEvent): void; - resetServer(error?: Error): void; + resetServer(error?: MongoError): void; resetConnectionPool(): void; close(): void; } & EventEmitterWithState; @@ -226,8 +226,10 @@ function checkServer(monitor: Monitor, callback: Callback) { new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err) ); - monitor.emit('resetServer', err); - monitor.emit('resetConnectionPool'); + const error = !(err instanceof MongoError) ? new MongoError(err) : err; + error.addErrorLabel(MongoErrorLabel.ResetPool); + + monitor.emit('resetServer', error); callback(err); } @@ -306,11 +308,6 @@ function checkServer(monitor: Monitor, callback: Callback) { if (err) { monitor[kConnection] = undefined; - // we already reset the connection pool on network errors in all cases - if (!(err instanceof MongoNetworkError)) { - monitor.emit('resetConnectionPool'); - } - failureHandler(err); return; } @@ -351,7 +348,6 @@ function monitorServer(monitor: Monitor) { if (err) { // otherwise an error occurred on initial discovery, also bail if (monitor[kServer].description.type === ServerType.Unknown) { - monitor.emit('resetServer', err); return done(); } } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index ac18d46756..3560ef9cfe 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -5,6 +5,7 @@ import { ConnectionPoolEvents, ConnectionPoolOptions } from '../cmap/connection_pool'; +import { PoolClearedError } from '../cmap/errors'; import { APM_EVENTS, CLOSED, @@ -177,10 +178,6 @@ export class Server extends TypedEventEmitter { monitor.on(event, (e: any) => this.emit(event, e)); } - monitor.on('resetConnectionPool', () => { - this.s.pool.clear(); - }); - monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error)); monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => { this.emit( @@ -358,7 +355,11 @@ export class Server extends TypedEventEmitter { (err, conn, cb) => { if (err || !conn) { this.s.operationCount -= 1; - markServerUnknown(this, err); + if (!(err instanceof PoolClearedError)) { + markServerUnknown(this, err); + } else { + err.addErrorLabel(MongoErrorLabel.RetryableWriteError); + } return cb(err); } @@ -494,9 +495,11 @@ function makeOperationHandler( // In load balanced mode we never mark the server as unknown and always // clear for the specific service id. - server.s.pool.clear(connection.serviceId); if (!server.loadBalanced) { + error.addErrorLabel(MongoErrorLabel.ResetPool); markServerUnknown(server, error); + } else { + server.s.pool.clear(connection.serviceId); } } } else { @@ -510,11 +513,15 @@ function makeOperationHandler( if (isSDAMUnrecoverableError(error)) { if (shouldHandleStateChangeError(server, error)) { - if (maxWireVersion(server) <= 7 || isNodeShuttingDownError(error)) { + const shouldClearPool = maxWireVersion(server) <= 7 || isNodeShuttingDownError(error); + if (server.loadBalanced && shouldClearPool) { server.s.pool.clear(connection.serviceId); } if (!server.loadBalanced) { + if (shouldClearPool) { + error.addErrorLabel(MongoErrorLabel.ResetPool); + } markServerUnknown(server, error); process.nextTick(() => server.requestCheck()); } diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index 1c3feeb629..541752cb43 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -1,5 +1,5 @@ import { Document, Long, ObjectId } from '../bson'; -import { MongoRuntimeError, MongoServerError } from '../error'; +import { MongoError, MongoRuntimeError, MongoServerError } from '../error'; import { arrayStrictEqual, compareObjectId, errorStrictEqual, HostAddress, now } from '../utils'; import type { ClusterTime } from './common'; import { ServerType } from './common'; @@ -53,7 +53,7 @@ export class ServerDescription { passives: string[]; arbiters: string[]; tags: TagSet; - error: MongoServerError | null; + error: MongoError | null; topologyVersion: TopologyVersion | null; minWireVersion: number; maxWireVersion: number; diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 69b82befac..8c33535014 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -26,6 +26,8 @@ import { import { MongoCompatibilityError, MongoDriverError, + MongoError, + MongoErrorLabel, MongoRuntimeError, MongoServerSelectionError, MongoTopologyClosedError @@ -814,6 +816,21 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes const server = topology.s.servers.get(incomingServerDescription.address); if (server) { server.s.description = incomingServerDescription; + if ( + incomingServerDescription.error instanceof MongoError && + incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool) + ) { + server.s.pool.clear(); + } else if (incomingServerDescription.error == null) { + const newTopologyType = topology.s.description.type; + const shouldMarkPoolReady = + incomingServerDescription.isDataBearing || + (incomingServerDescription.type !== ServerType.Unknown && + newTopologyType === TopologyType.Single); + if (shouldMarkPoolReady) { + server.s.pool.ready(); + } + } } } diff --git a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkin-stats.json b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkin-stats.json index 57e71d8f3d..59d5fd302a 100644 --- a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkin-stats.json +++ b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkin-stats.json @@ -6,6 +6,9 @@ "minPoolSize": 3 }, "operations": [ + { + "name": "ready" + }, { "name": "waitForEvent", "event": "ConnectionCreated", @@ -66,6 +69,7 @@ } ], "ignore": [ + "ConnectionPoolReady", "ConnectionReady", "ConnectionClosed", "ConnectionCheckOutStarted" diff --git a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkout-stats.json b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkout-stats.json index 3cc4229eb0..28715da2cb 100644 --- a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkout-stats.json +++ b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-checkout-stats.json @@ -6,6 +6,9 @@ "minPoolSize": 3 }, "operations": [ + { + "name": "ready" + }, { "name": "waitForEvent", "event": "ConnectionCreated", @@ -56,6 +59,7 @@ } ], "ignore": [ + "ConnectionPoolReady", "ConnectionReady", "ConnectionClosed", "ConnectionCheckOutStarted" diff --git a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json index cb619e2f75..890193706f 100644 --- a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json +++ b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json @@ -6,6 +6,9 @@ "minPoolSize": 3 }, "operations": [ + { + "name": "ready" + }, { "name": "waitForEvent", "event": "ConnectionCreated", @@ -54,6 +57,7 @@ } ], "ignore": [ + "ConnectionPoolReady", "ConnectionReady", "ConnectionClosed" ] diff --git a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json index 0238a09fad..7bb82221e1 100644 --- a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json +++ b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json @@ -6,6 +6,9 @@ "minPoolSize": 2 }, "operations": [ + { + "name": "ready" + }, { "name": "waitForEvent", "event": "ConnectionReady", @@ -26,10 +29,13 @@ "name": "checkIn", "connection": "conn" }, + { + "name": "ready" + }, { "name": "waitForEvent", "event": "ConnectionReady", - "count": 3 + "count": 4 } ], "events": [ @@ -54,11 +60,10 @@ "address": 42 }, { - "type": "ConnectionClosed", - "reason": "stale", + "type": "ConnectionReady", "address": 42, - "availableConnectionCount": 1, - "pendingConnectionCount": 0, + "availableConnectionCount": 0, + "pendingConnectionCount": 1, "totalConnectionCount": 1 }, { @@ -70,8 +75,10 @@ } ], "ignore": [ + "ConnectionPoolReady", "ConnectionPoolCreated", "ConnectionCreated", + "ConnectionClosed", "ConnectionCheckOutStarted" ] } diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts index bff16bd245..8ee4487673 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts @@ -4,8 +4,14 @@ import { CmapTest, runCmapTestSuite, SkipDescription } from '../../tools/cmap_sp // These tests rely on a simple "pool.clear()" command, which is not sufficient // to properly clear the pool in LB mode, since it requires a serviceId to be passed in const LB_SKIP_TESTS: SkipDescription[] = [ + 'must replace removed connections up to minPoolSize', 'must destroy checked in connection if it is stale', - 'must destroy and must not check out a stale connection if found while iterating available connections' + 'must destroy and must not check out a stale connection if found while iterating available connections', + 'clearing pool clears the WaitQueue', + 'pool clear halts background minPoolSize establishments', + 'clearing a paused pool emits no events', + 'after clear, cannot check out connections until pool ready', + 'readying a ready pool emits no events' ].map(description => ({ description, skipIfCondition: 'loadBalanced', @@ -13,17 +19,11 @@ const LB_SKIP_TESTS: SkipDescription[] = [ })); const POOL_PAUSED_SKIP_TESTS: SkipDescription[] = [ - 'clearing pool clears the WaitQueue', - 'pool clear halts background minPoolSize establishments', - 'clearing a paused pool emits no events', - 'after clear, cannot check out connections until pool ready', - 'error during minPoolSize population clears pool', - 'readying a ready pool emits no events', - 'pool starts as cleared and becomes ready' + 'error during minPoolSize population clears pool' ].map(description => ({ description, skipIfCondition: 'always', - skipReason: 'TODO(NODE-2994): implement pool pausing' + skipReason: 'TODO(NODE-3135): make connection pool SDAM aware' })); describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () { diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts new file mode 100644 index 0000000000..8d41348dbf --- /dev/null +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts @@ -0,0 +1,104 @@ +import { expect } from 'chai'; +import { once } from 'events'; + +import { MongoClient } from '../../../src'; +import { + CONNECTION_POOL_CLEARED, + CONNECTION_POOL_READY, + SERVER_HEARTBEAT_FAILED, + SERVER_HEARTBEAT_SUCCEEDED +} from '../../../src/constants'; + +describe('Server Discovery and Monitoring Prose Tests', function () { + context('Connection Pool Management', function () { + /* + This test will be used to ensure monitors properly create and unpause connection pools when they discover servers. + This test requires failCommand appName support which is only available in MongoDB 4.2.9+. + 1. Create a client with directConnection=true, appName="SDAMPoolManagementTest", and heartbeatFrequencyMS=500 (or lower if possible). + 2. Verify via SDAM and CMAP event monitoring that a ConnectionPoolReadyEvent occurs after the first ServerHeartbeatSucceededEvent event does. + 3. Enable the following failpoint: + { + configureFailPoint: "failCommand", + mode: { times: 2 }, + data: { + failCommands: ["hello"], // or legacy hello command + errorCode: 1234, + appName: "SDAMPoolManagementTest" + } + } + 4. Verify that a ServerHeartbeatFailedEvent and a ConnectionPoolClearedEvent (CMAP) are emitted. + 5. Then verify that a ServerHeartbeatSucceededEvent and a ConnectionPoolReadyEvent (CMAP) are emitted. + 6. Disable the failpoint. + */ + + let client: MongoClient; + const events: string[] = []; + beforeEach(async function () { + client = this.configuration.newClient({ + directConnection: true, + appName: 'SDAMPoolManagementTest', + heartbeatFrequencyMS: 500 + }); + + for (const event of [ + CONNECTION_POOL_READY, + SERVER_HEARTBEAT_SUCCEEDED, + SERVER_HEARTBEAT_FAILED, + CONNECTION_POOL_CLEARED + ]) { + client.on(event, () => { + events.push(event); + }); + } + }); + + afterEach(async function () { + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'off', + data: { + failCommands: ['hello'], + errorCode: 1234, + appName: 'SDAMPoolManagementTest' + } + }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('ensure monitors properly create and unpause connection pools when they discover servers', { + metadata: { requires: { mongodb: '>=4.2.9', topology: '!load-balanced' } }, + test: async function () { + await client.connect(); + expect(events.shift()).to.equal(SERVER_HEARTBEAT_SUCCEEDED); + expect(events.shift()).to.equal(CONNECTION_POOL_READY); + + expect(events).to.be.empty; + + const heartBeatFailedEvent = once(client, SERVER_HEARTBEAT_FAILED); + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, + data: { + failCommands: ['hello'], + errorCode: 1234, + appName: 'SDAMPoolManagementTest' + } + }); + await heartBeatFailedEvent; + expect(events.shift()).to.equal(SERVER_HEARTBEAT_FAILED); + expect(events.shift()).to.equal(CONNECTION_POOL_CLEARED); + + expect(events).to.be.empty; + + await once(client, SERVER_HEARTBEAT_SUCCEEDED); + expect(events.shift()).to.equal(SERVER_HEARTBEAT_SUCCEEDED); + expect(events.shift()).to.equal(CONNECTION_POOL_READY); + + expect(events).to.be.empty; + } + }); + }); +}); diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts index 84e3289e62..3a95f9493b 100644 --- a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts @@ -16,9 +16,12 @@ const filter: TestFilter = ({ description }) => { case 'Reset server and pool after network timeout error during authentication': case 'Reset server and pool after shutdown error during authentication': // These tests time out waiting for the PoolCleared event - return isAuthEnabled ? 'TODO(NODE-3891): fix tests broken when AUTH enabled' : false; - case 'Network error on minPoolSize background creation': - return 'TODO(NODE-4385): implement pool pausing and pool ready event'; + return isAuthEnabled + ? 'TODO(NODE-3135): handle auth errors, also see NODE-3891: fix tests broken when AUTH enabled' + : false; + case 'Network error on Monitor check': + case 'Network timeout on Monitor check': + return 'TODO(NODE-4608): Disallow parallel monitor checks'; default: return false; } @@ -26,6 +29,9 @@ const filter: TestFilter = ({ description }) => { describe('SDAM Unified Tests', function () { afterEach(async function () { + if (this.currentTest!.pending) { + return; + } // TODO(NODE-4573): fix socket leaks const LEAKY_TESTS = [ 'Command error on Monitor handshake', diff --git a/test/integration/unified-test-format/unified_test_format.spec.test.ts b/test/integration/unified-test-format/unified_test_format.spec.test.ts index 2258cbefea..0af0a0fe68 100644 --- a/test/integration/unified-test-format/unified_test_format.spec.test.ts +++ b/test/integration/unified-test-format/unified_test_format.spec.test.ts @@ -28,10 +28,7 @@ const filter: TestFilter = ({ description }) => { [ 'FindOneAndUpdate is committed on first attempt', 'FindOneAndUpdate is not committed on first attempt', - 'FindOneAndUpdate is never committed', - 'eventType defaults to command if unset', - 'events are captured during an operation', - 'eventType can be set to command and cmap' + 'FindOneAndUpdate is never committed' ].includes(description) ) { return 'TODO(NODE-3891): fix tests broken when AUTH enabled'; diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index e4202a43f2..54fc94cfd6 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -1,22 +1,25 @@ import { expect } from 'chai'; import { EventEmitter } from 'events'; +import { clearTimeout, setTimeout } from 'timers'; import { promisify } from 'util'; import { Connection, HostAddress, MongoClient } from '../../src'; import { ConnectionPool, ConnectionPoolOptions } from '../../src/cmap/connection_pool'; -import { shuffle } from '../../src/utils'; +import { CMAP_EVENTS } from '../../src/constants'; +import { makeClientMetadata, shuffle } from '../../src/utils'; import { isAnyRequirementSatisfied } from './unified-spec-runner/unified-utils'; import { FailPoint, sleep } from './utils'; type CmapOperation = | { name: 'start' | 'waitForThread'; target: string } | { name: 'wait'; ms: number } - | { name: 'waitForEvent'; event: string; count: number } + | { name: 'waitForEvent'; event: string; count: number; timeout?: number } | { name: 'checkOut'; thread: string; label: string } | { name: 'checkIn'; connection: string } | { name: 'clear' | 'close' | 'ready' }; const CMAP_POOL_OPTION_NAMES: Array = [ + 'appName', 'backgroundThreadIntervalMS', 'maxPoolSize', 'minPoolSize', @@ -26,6 +29,7 @@ const CMAP_POOL_OPTION_NAMES: Array = [ ]; type CmapPoolOptions = { + appName?: string; backgroundThreadIntervalMS?: number; maxPoolSize?: number; minPoolSize?: number; @@ -77,18 +81,7 @@ export type CmapTest = { failPoint?: FailPoint; }; -const ALL_POOL_EVENTS = new Set([ - ConnectionPool.CONNECTION_POOL_CREATED, - ConnectionPool.CONNECTION_POOL_CLOSED, - ConnectionPool.CONNECTION_POOL_CLEARED, - ConnectionPool.CONNECTION_CREATED, - ConnectionPool.CONNECTION_READY, - ConnectionPool.CONNECTION_CLOSED, - ConnectionPool.CONNECTION_CHECK_OUT_STARTED, - ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - ConnectionPool.CONNECTION_CHECKED_OUT, - ConnectionPool.CONNECTION_CHECKED_IN -]); +const ALL_POOL_EVENTS = new Set(CMAP_EVENTS); function getEventType(event) { const eventName = event.constructor.name; @@ -210,8 +203,7 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({ return await promisify(ConnectionPool.prototype.close).call(threadContext.pool); }, ready: function () { - // This is a no-op until pool pausing is implemented - return; + return threadContext.pool.ready(); }, wait: async function (options) { const ms = options.ms; @@ -237,9 +229,15 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({ waitForEvent: function (options): Promise { const event = options.event; const count = options.count; - return new Promise(resolve => { + const timeout = options.timeout ?? 15000; + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + reject(new Error(`Timed out while waiting for event ${event}`)); + }, timeout); + function run() { if (threadContext.poolEvents.filter(ev => getEventType(ev) === event).length >= count) { + clearTimeout(timeoutId); return resolve(); } @@ -351,12 +349,15 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) { delete poolOptions.backgroundThreadIntervalMS; } + let metadata; + if (poolOptions.appName) { + metadata = makeClientMetadata({ appName: poolOptions.appName }); + delete poolOptions.appName; + } + const operations = test.operations; const expectedError = test.error; - const expectedEvents = test.events - ? // TODO(NODE-2994): remove filter once ready is implemented - test.events.filter(event => event.type !== 'ConnectionPoolReady') - : []; + const expectedEvents = test.events; const ignoreEvents = test.ignore || []; let actualError; @@ -365,7 +366,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) { const mainThread = threadContext.getThread(MAIN_THREAD_KEY); mainThread.start(); - threadContext.createPool(poolOptions); + threadContext.createPool({ ...poolOptions, metadata }); // yield control back to the event loop so that the ConnectionPoolCreatedEvent // has a chance to be fired before any synchronously-emitted events from // the queued operations diff --git a/test/tools/spec-runner/index.js b/test/tools/spec-runner/index.js index 4deb60d99d..01eb8fa6e8 100644 --- a/test/tools/spec-runner/index.js +++ b/test/tools/spec-runner/index.js @@ -8,7 +8,12 @@ const { EJSON } = require('bson'); const { isRecord } = require('../../../src/utils'); const TestRunnerContext = require('./context').TestRunnerContext; const resolveConnectionString = require('./utils').resolveConnectionString; -const { LEGACY_HELLO_COMMAND } = require('../../../src/constants'); +const { + LEGACY_HELLO_COMMAND, + CMAP_EVENTS: SOURCE_CMAP_EVENTS, + TOPOLOGY_EVENTS, + HEARTBEAT_EVENTS +} = require('../../../src/constants'); const { isAnyRequirementSatisfied } = require('../unified-spec-runner/unified-utils'); const ClientSideEncryptionFilter = require('../runner/filters/client_encryption_filter'); @@ -164,17 +169,6 @@ function generateTopologyTests(testSuites, testContext, filter) { const { spec } = this.currentTest; - if ( - shouldRun && - spec.operations.some( - op => op.name === 'waitForEvent' && op.arguments.event === 'PoolReadyEvent' - ) - ) { - this.currentTest.skipReason = - 'TODO(NODE-2994): Connection storms work will add new events to connection pool'; - shouldRun = false; - } - if (shouldRun && spec.skipReason) { this.currentTest.skipReason = spec.skipReason; shouldRun = false; @@ -335,29 +329,11 @@ function parseSessionOptions(options) { const IGNORED_COMMANDS = new Set([LEGACY_HELLO_COMMAND, 'configureFailPoint', 'endSessions']); const SDAM_EVENTS = new Set([ - 'serverOpening', - 'serverClosed', - 'serverDescriptionChanged', - 'topologyOpening', - 'topologyClosed', - 'topologyDescriptionChanged', - 'serverHeartbeatStarted', - 'serverHeartbeatSucceeded', - 'serverHeartbeatFailed' + ...TOPOLOGY_EVENTS.filter(ev => !['error', 'timeout', 'close'].includes(ev)), + ...HEARTBEAT_EVENTS ]); -const CMAP_EVENTS = new Set([ - 'connectionPoolCreated', - 'connectionPoolClosed', - 'connectionCreated', - 'connectionReady', - 'connectionClosed', - 'connectionCheckOutStarted', - 'connectionCheckOutFailed', - 'connectionCheckedOut', - 'connectionCheckedIn', - 'connectionPoolCleared' -]); +const CMAP_EVENTS = new Set(SOURCE_CMAP_EVENTS); function runTestSuiteTest(configuration, spec, context) { context.commandEvents = []; diff --git a/test/tools/unified-spec-runner/entities.ts b/test/tools/unified-spec-runner/entities.ts index 8173a8cd4e..95a93f2cbe 100644 --- a/test/tools/unified-spec-runner/entities.ts +++ b/test/tools/unified-spec-runner/entities.ts @@ -18,6 +18,7 @@ import { ConnectionPoolClearedEvent, ConnectionPoolClosedEvent, ConnectionPoolCreatedEvent, + ConnectionPoolReadyEvent, ConnectionReadyEvent } from '../../../src/cmap/connection_pool_events'; import { @@ -77,6 +78,7 @@ export class UnifiedThread { this.#killed = true; await this.#promise; if (this.#error) { + this.#error.message = `: ${this.#error.message}`; throw this.#error; } } @@ -86,6 +88,7 @@ export type CommandEvent = CommandStartedEvent | CommandSucceededEvent | Command export type CmapEvent = | ConnectionPoolCreatedEvent | ConnectionPoolClosedEvent + | ConnectionPoolReadyEvent | ConnectionCreatedEvent | ConnectionReadyEvent | ConnectionClosedEvent @@ -110,6 +113,7 @@ export class UnifiedMongoClient extends MongoClient { observedCmapEvents: ( | 'connectionPoolCreated' | 'connectionPoolClosed' + | 'connectionPoolReady' | 'connectionPoolCleared' | 'connectionCreated' | 'connectionReady' @@ -132,6 +136,7 @@ export class UnifiedMongoClient extends MongoClient { static CMAP_EVENT_NAME_LOOKUP = { poolCreatedEvent: 'connectionPoolCreated', poolClosedEvent: 'connectionPoolClosed', + poolReadyEvent: 'connectionPoolReady', poolClearedEvent: 'connectionPoolCleared', connectionCreatedEvent: 'connectionCreated', connectionReadyEvent: 'connectionReady', diff --git a/test/tools/unified-spec-runner/match.ts b/test/tools/unified-spec-runner/match.ts index 123c57f50a..93791f5335 100644 --- a/test/tools/unified-spec-runner/match.ts +++ b/test/tools/unified-spec-runner/match.ts @@ -28,6 +28,7 @@ import { ConnectionPoolClearedEvent, ConnectionPoolClosedEvent, ConnectionPoolCreatedEvent, + ConnectionPoolReadyEvent, ConnectionReadyEvent } from '../../../src/cmap/connection_pool_events'; import { ejson } from '../utils'; @@ -331,6 +332,7 @@ export function specialCheck( // CMAP events where the payload does not matter. const EMPTY_CMAP_EVENTS = { poolCreatedEvent: ConnectionPoolCreatedEvent, + poolReadyEvent: ConnectionPoolReadyEvent, poolClosedEvent: ConnectionPoolClosedEvent, connectionCreatedEvent: ConnectionCreatedEvent, connectionReadyEvent: ConnectionReadyEvent, diff --git a/test/tools/unified-spec-runner/schema.ts b/test/tools/unified-spec-runner/schema.ts index 05562c0ba1..94b5e2f4f5 100644 --- a/test/tools/unified-spec-runner/schema.ts +++ b/test/tools/unified-spec-runner/schema.ts @@ -114,6 +114,7 @@ export type ObservableCommandEventId = export type ObservableCmapEventId = | 'connectionPoolCreatedEvent' | 'connectionPoolClosedEvent' + | 'connectionPoolReadyEvent' | 'connectionPoolClearedEvent' | 'connectionCreatedEvent' | 'connectionReadyEvent' diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index fdee3bde66..3f2b5176d7 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -27,6 +27,7 @@ describe('Connection Pool', function () { }); const pool = new ConnectionPool({ maxPoolSize: 1, hostAddress: server.hostAddress() }); + pool.ready(); const events = []; pool.on('connectionClosed', event => events.push(event)); @@ -74,6 +75,8 @@ describe('Connection Pool', function () { hostAddress: server.hostAddress() }); + pool.ready(); + pool.withConnection( (err, conn, cb) => { expect(err).to.not.exist; @@ -102,6 +105,8 @@ describe('Connection Pool', function () { hostAddress: server.hostAddress() }); + pool.ready(); + pool.checkOut((err, conn) => { expect(err).to.not.exist; expect(conn).to.exist; @@ -133,6 +138,8 @@ describe('Connection Pool', function () { }); const pool = new ConnectionPool({ hostAddress: server.hostAddress() }); + pool.ready(); + const callback = (err, result) => { expect(err).to.not.exist; expect(result).to.exist; @@ -167,6 +174,8 @@ describe('Connection Pool', function () { hostAddress: server.hostAddress() }); + pool.ready(); + const callback = err => { expect(err).to.exist; expect(err).to.match(/closed/); @@ -193,6 +202,8 @@ describe('Connection Pool', function () { }); const pool = new ConnectionPool({ hostAddress: server.hostAddress() }); + pool.ready(); + const callback = (err, result) => { expect(err).to.exist; expect(result).to.not.exist; @@ -219,6 +230,7 @@ describe('Connection Pool', function () { }); const pool = new ConnectionPool({ maxPoolSize: 1, hostAddress: server.hostAddress() }); + pool.ready(); const events = []; pool.on('connectionCheckedOut', event => events.push(event)); diff --git a/test/unit/error.test.ts b/test/unit/error.test.ts index 8f5d10fb14..3ae43e35ea 100644 --- a/test/unit/error.test.ts +++ b/test/unit/error.test.ts @@ -90,10 +90,12 @@ describe('MongoErrors', () => { it('should accept an Error object', () => { const errorMessage = 'A test error'; - const err = new MongoError(new Error(errorMessage)); + const inputError = new Error(errorMessage); + const err = new MongoError(inputError); expect(err).to.be.an.instanceof(Error); expect(err.name).to.equal('MongoError'); expect(err.message).to.equal(errorMessage); + expect(err).to.have.property('cause', inputError); }); });