From c554a7a0d132437078a4c9d5e9ed828cce982455 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 3 Aug 2021 22:12:50 +0200 Subject: [PATCH] feat(NODE-3011): Load Balancer Support (#2909) --- .evergreen/config.yml | 53 +++++ .evergreen/config.yml.in | 41 ++++ .evergreen/generate_evergreen_tasks.js | 29 ++- .evergreen/run-tests.sh | 2 +- package.json | 1 + src/cmap/command_monitoring_events.ts | 40 +++- src/cmap/connect.ts | 25 +- src/cmap/connection.ts | 74 +++--- src/cmap/connection_pool.ts | 153 ++++++++++-- src/cmap/connection_pool_events.ts | 9 +- src/cmap/errors.ts | 7 +- src/cmap/metrics.ts | 58 +++++ src/cmap/stream_description.ts | 5 + src/connection_string.ts | 56 ++++- src/cursor/abstract_cursor.ts | 136 ++++++----- src/error.ts | 16 +- src/index.ts | 4 +- src/mongo_client.ts | 3 + src/operations/aggregate.ts | 7 +- src/operations/estimated_document_count.ts | 6 +- src/operations/execute_operation.ts | 41 +++- src/operations/find.ts | 7 +- src/operations/find_one.ts | 2 +- src/operations/indexes.ts | 6 +- src/operations/list_collections.ts | 6 +- src/operations/operation.ts | 8 +- src/sdam/common.ts | 6 +- src/sdam/server.ts | 219 +++++++++++++----- src/sdam/server_description.ts | 20 +- src/sdam/server_selection.ts | 8 + src/sdam/topology.ts | 74 ++++-- src/sdam/topology_description.ts | 8 +- src/sessions.ts | 142 +++++++++++- src/transactions.ts | 21 +- src/utils.ts | 23 ++ test/functional/retryable_writes.test.js | 8 +- test/functional/spec-runner/context.js | 9 +- test/functional/spec-runner/index.js | 6 +- test/functional/transactions.test.js | 4 +- .../unified-spec-runner/entities.ts | 14 +- .../unified-spec-runner/operations.ts | 20 +- test/functional/unified-spec-runner/runner.ts | 41 +++- test/functional/unified-spec-runner/schema.ts | 1 + .../unified-spec-runner/unified-utils.ts | 5 +- test/manual/load-balancer.test.js | 32 +++ test/tools/runner/config.js | 26 ++- .../runner/filters/mongodb_topology_filter.js | 2 + test/unit/cmap/connection.test.js | 48 +++- test/unit/cmap/connection_pool.test.js | 29 ++- test/unit/cmap/metrics.test.js | 134 +++++++++++ test/unit/cmap/stream_description.test.js | 67 ++++++ test/unit/core/mongodb_srv.test.js | 7 +- test/unit/core/response_test.js.test.js | 2 + test/unit/mongo_client_options.test.js | 44 ++++ test/unit/sdam/server_selection/spec.test.js | 24 +- test/unit/sdam/spec.test.js | 20 +- 56 files changed, 1514 insertions(+), 345 deletions(-) create mode 100644 src/cmap/metrics.ts create mode 100644 test/manual/load-balancer.test.js create mode 100644 test/unit/cmap/metrics.test.js create mode 100644 test/unit/cmap/stream_description.test.js diff --git a/.evergreen/config.yml b/.evergreen/config.yml index ac1e404bac..cf47a678b5 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -134,6 +134,44 @@ functions: MONGODB_API_VERSION="${MONGODB_API_VERSION}" \ NODE_VERSION=${NODE_VERSION} SKIP_DEPS=${SKIP_DEPS|1} NO_EXIT=${NO_EXIT|1} \ bash ${PROJECT_DIRECTORY}/.evergreen/run-tests.sh + start-load-balancer: + - command: shell.exec + params: + script: | + DRIVERS_TOOLS=${DRIVERS_TOOLS} MONGODB_URI=${MONGODB_URI} \ + bash ${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh start + - command: expansions.update + params: + file: lb-expansion.yml + stop-load-balancer: + - command: shell.exec + params: + script: | + DRIVERS_TOOLS=${DRIVERS_TOOLS} MONGODB_URI=${MONGODB_URI} \ + bash ${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh stop + run-lb-tests: + - command: shell.exec + type: test + params: + working_dir: src + timeout_secs: 60 + script: | + ${PREPARE_SHELL} + + MONGODB_URI="${MONGODB_URI}" \ + AUTH=${AUTH} \ + SSL=${SSL} \ + UNIFIED=${UNIFIED} \ + MONGODB_API_VERSION="${MONGODB_API_VERSION}" \ + NODE_VERSION=${NODE_VERSION} \ + SINGLE_MONGOS_LB_URI="${SINGLE_MONGOS_LB_URI}" \ + MULTI_MONGOS_LB_URI="${MULTI_MONGOS_LB_URI}" \ + TOPOLOGY="${TOPOLOGY}" \ + SKIP_DEPS=${SKIP_DEPS|1} \ + NO_EXIT=${NO_EXIT|1} \ + TEST_NPM_SCRIPT="check:load-balancer" \ + FAKE_MONGODB_SERVICE_ID="true" \ + bash ${PROJECT_DIRECTORY}/.evergreen/run-tests.sh run checks: - command: shell.exec type: test @@ -937,6 +975,20 @@ tasks: - func: install dependencies - func: bootstrap mongohoused - func: run data lake tests + - name: test-load-balancer + tags: + - latest + - sharded_cluster + - load_balancer + commands: + - func: install dependencies + - func: bootstrap mongo-orchestration + vars: + VERSION: '5.0' + TOPOLOGY: sharded_cluster + - func: start-load-balancer + - func: run-lb-tests + - func: stop-load-balancer - name: test-auth-kerberos tags: - auth @@ -1760,6 +1812,7 @@ buildvariants: - test-latest-server-v1-api - test-atlas-connectivity - test-atlas-data-lake + - test-load-balancer - test-auth-kerberos - test-auth-ldap - test-ocsp-valid-cert-server-staples diff --git a/.evergreen/config.yml.in b/.evergreen/config.yml.in index e9fa0553ef..dbb05f1cc3 100644 --- a/.evergreen/config.yml.in +++ b/.evergreen/config.yml.in @@ -155,6 +155,47 @@ functions: NODE_VERSION=${NODE_VERSION} SKIP_DEPS=${SKIP_DEPS|1} NO_EXIT=${NO_EXIT|1} \ bash ${PROJECT_DIRECTORY}/.evergreen/run-tests.sh + "start-load-balancer": + - command: shell.exec + params: + script: | + DRIVERS_TOOLS=${DRIVERS_TOOLS} MONGODB_URI=${MONGODB_URI} \ + bash ${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh start + - command: expansions.update + params: + file: lb-expansion.yml + + "stop-load-balancer": + - command: shell.exec + params: + script: | + DRIVERS_TOOLS=${DRIVERS_TOOLS} MONGODB_URI=${MONGODB_URI} \ + bash ${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh stop + + "run-lb-tests": + - command: shell.exec + type: test + params: + working_dir: src + timeout_secs: 60 + script: | + ${PREPARE_SHELL} + + MONGODB_URI="${MONGODB_URI}" \ + AUTH=${AUTH} \ + SSL=${SSL} \ + UNIFIED=${UNIFIED} \ + MONGODB_API_VERSION="${MONGODB_API_VERSION}" \ + NODE_VERSION=${NODE_VERSION} \ + SINGLE_MONGOS_LB_URI="${SINGLE_MONGOS_LB_URI}" \ + MULTI_MONGOS_LB_URI="${MULTI_MONGOS_LB_URI}" \ + TOPOLOGY="${TOPOLOGY}" \ + SKIP_DEPS=${SKIP_DEPS|1} \ + NO_EXIT=${NO_EXIT|1} \ + TEST_NPM_SCRIPT="check:load-balancer" \ + FAKE_MONGODB_SERVICE_ID="true" \ + bash ${PROJECT_DIRECTORY}/.evergreen/run-tests.sh + "run checks": - command: shell.exec type: test diff --git a/.evergreen/generate_evergreen_tasks.js b/.evergreen/generate_evergreen_tasks.js index 4a48f67161..45b877e3c4 100644 --- a/.evergreen/generate_evergreen_tasks.js +++ b/.evergreen/generate_evergreen_tasks.js @@ -42,7 +42,8 @@ const OPERATING_SYSTEMS = [ })); // TODO: NODE-3060: enable skipped tests on windows -const WINDOWS_SKIP_TAGS = new Set(['atlas-connect', 'auth']); +const WINDOWS_SKIP_TAGS = new Set(['atlas-connect', 'auth', 'load_balancer']); +const MACOS_SKIP_TAGS = new Set(['load_balancer']); const TASKS = []; const SINGLETON_TASKS = []; @@ -107,6 +108,23 @@ TASKS.push(...[ { func: 'run data lake tests' } ] }, + { + name: 'test-load-balancer', + tags: ['latest', 'sharded_cluster', 'load_balancer'], + commands: [ + { func: 'install dependencies' }, + { + func: 'bootstrap mongo-orchestration', + vars: { + VERSION: '5.0', + TOPOLOGY: 'sharded_cluster' + } + }, + { func: 'start-load-balancer' }, + { func: 'run-lb-tests' }, + { func: 'stop-load-balancer' } + ] + }, { name: 'test-auth-kerberos', tags: ['auth', 'kerberos'], @@ -429,11 +447,12 @@ const getTaskList = (() => { .filter(task => { if (task.name.match(/^aws/)) return false; - // skip unsupported tasks on windows + // skip unsupported tasks on windows or macos if ( - os.match(/^windows/) && - task.tags && - task.tags.filter(tag => WINDOWS_SKIP_TAGS.has(tag)).length + task.tags && ( + (os.match(/^windows/) && task.tags.filter(tag => WINDOWS_SKIP_TAGS.has(tag)).length) || + (os.match(/^macos/) && task.tags.filter(tag => MACOS_SKIP_TAGS.has(tag)).length) + ) ) { return false; } diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 2f2739f7b4..8a4fef1271 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -49,4 +49,4 @@ else . $DRIVERS_TOOLS/.evergreen/csfle/set-temp-creds.sh fi -MONGODB_API_VERSION=${MONGODB_API_VERSION} MONGODB_UNIFIED_TOPOLOGY=${UNIFIED} MONGODB_URI=${MONGODB_URI} npm run ${TEST_NPM_SCRIPT} +SINGLE_MONGOS_LB_URI=${SINGLE_MONGOS_LB_URI} MULTI_MONGOS_LB_URI=${MULTI_MONGOS_LB_URI} MONGODB_API_VERSION=${MONGODB_API_VERSION} MONGODB_UNIFIED_TOPOLOGY=${UNIFIED} MONGODB_URI=${MONGODB_URI} npm run ${TEST_NPM_SCRIPT} diff --git a/package.json b/package.json index 8f8b19e87d..4dada2d69c 100644 --- a/package.json +++ b/package.json @@ -121,6 +121,7 @@ "check:ts": "tsc -v && tsc --noEmit", "check:atlas": "mocha --config \"test/manual/mocharc.json\" test/manual/atlas_connectivity.test.js", "check:adl": "mocha test/manual/data_lake.test.js", + "check:load-balancer": "mocha test/manual/load-balancer.test.js", "check:ocsp": "mocha --config \"test/manual/mocharc.json\" test/manual/ocsp_support.test.js", "check:kerberos": "mocha --config \"test/manual/mocharc.json\" test/manual/kerberos.test.js", "check:tls": "mocha --config \"test/manual/mocharc.json\" test/manual/tls_support.test.js", diff --git a/src/cmap/command_monitoring_events.ts b/src/cmap/command_monitoring_events.ts index 35e138ca29..2793c16dac 100644 --- a/src/cmap/command_monitoring_events.ts +++ b/src/cmap/command_monitoring_events.ts @@ -1,8 +1,7 @@ import { GetMore, KillCursor, Msg, WriteProtocolMessageType } from './commands'; import { calculateDurationInMs, deepCopy } from '../utils'; -import type { ConnectionPool } from './connection_pool'; import type { Connection } from './connection'; -import type { Document } from '../bson'; +import type { Document, ObjectId } from '../bson'; /** * An event indicating the start of a given @@ -17,6 +16,7 @@ export class CommandStartedEvent { command: Document; address: string; connectionId?: string | number; + serviceId?: ObjectId; /** * Create a started event @@ -25,10 +25,10 @@ export class CommandStartedEvent { * @param pool - the pool that originated the command * @param command - the command */ - constructor(pool: Connection | ConnectionPool, command: WriteProtocolMessageType) { + constructor(connection: Connection, command: WriteProtocolMessageType) { const cmd = extractCommand(command); const commandName = extractCommandName(cmd); - const { address, connectionId } = extractConnectionDetails(pool); + const { address, connectionId, serviceId } = extractConnectionDetails(connection); // TODO: remove in major revision, this is not spec behavior if (SENSITIVE_COMMANDS.has(commandName)) { @@ -38,11 +38,17 @@ export class CommandStartedEvent { this.address = address; this.connectionId = connectionId; + this.serviceId = serviceId; this.requestId = command.requestId; this.databaseName = databaseName(command); this.commandName = commandName; this.command = maybeRedact(commandName, cmd, cmd); } + + /* @internal */ + get hasServiceId(): boolean { + return !!this.serviceId; + } } /** @@ -57,6 +63,7 @@ export class CommandSucceededEvent { duration: number; commandName: string; reply: unknown; + serviceId?: ObjectId; /** * Create a succeeded event @@ -68,22 +75,28 @@ export class CommandSucceededEvent { * @param started - a high resolution tuple timestamp of when the command was first sent, to calculate duration */ constructor( - pool: Connection | ConnectionPool, + connection: Connection, command: WriteProtocolMessageType, reply: Document | undefined, started: number ) { const cmd = extractCommand(command); const commandName = extractCommandName(cmd); - const { address, connectionId } = extractConnectionDetails(pool); + const { address, connectionId, serviceId } = extractConnectionDetails(connection); this.address = address; this.connectionId = connectionId; + this.serviceId = serviceId; this.requestId = command.requestId; this.commandName = commandName; this.duration = calculateDurationInMs(started); this.reply = maybeRedact(commandName, cmd, extractReply(command, reply)); } + + /* @internal */ + get hasServiceId(): boolean { + return !!this.serviceId; + } } /** @@ -98,6 +111,8 @@ export class CommandFailedEvent { duration: number; commandName: string; failure: Error; + serviceId?: ObjectId; + /** * Create a failure event * @@ -108,23 +123,29 @@ export class CommandFailedEvent { * @param started - a high resolution tuple timestamp of when the command was first sent, to calculate duration */ constructor( - pool: Connection | ConnectionPool, + connection: Connection, command: WriteProtocolMessageType, error: Error | Document, started: number ) { const cmd = extractCommand(command); const commandName = extractCommandName(cmd); - const { address, connectionId } = extractConnectionDetails(pool); + const { address, connectionId, serviceId } = extractConnectionDetails(connection); this.address = address; this.connectionId = connectionId; + this.serviceId = serviceId; this.requestId = command.requestId; this.commandName = commandName; this.duration = calculateDurationInMs(started); this.failure = maybeRedact(commandName, cmd, error) as Error; } + + /* @internal */ + get hasServiceId(): boolean { + return !!this.serviceId; + } } /** Commands that we want to redact because of the sensitive nature of their contents */ @@ -300,13 +321,14 @@ function extractReply(command: WriteProtocolMessageType, reply?: Document) { return deepCopy(reply.result ? reply.result : reply); } -function extractConnectionDetails(connection: Connection | ConnectionPool) { +function extractConnectionDetails(connection: Connection) { let connectionId; if ('id' in connection) { connectionId = connection.id; } return { address: connection.address, + serviceId: connection.serviceId, connectionId }; } diff --git a/src/cmap/connect.ts b/src/cmap/connect.ts index e9b2c21f6b..3cb4ea73f9 100644 --- a/src/cmap/connect.ts +++ b/src/cmap/connect.ts @@ -20,10 +20,14 @@ import { MIN_SUPPORTED_SERVER_VERSION } from './wire_protocol/constants'; import type { Document } from '../bson'; +import { Int32 } from '../bson'; import type { Socket, SocketConnectOpts } from 'net'; import type { TLSSocket, ConnectionOptions as TLSConnectionOpts } from 'tls'; -import { Int32 } from '../bson'; + +const FAKE_MONGODB_SERVICE_ID = + typeof process.env.FAKE_MONGODB_SERVICE_ID === 'string' && + process.env.FAKE_MONGODB_SERVICE_ID.toLowerCase() === 'true'; /** @public */ export type Stream = Socket | TLSSocket; @@ -133,6 +137,21 @@ function performInitialHandshake( return; } + if (options.loadBalanced) { + // TODO: Durran: Remove when server support exists. (NODE-3431) + if (FAKE_MONGODB_SERVICE_ID) { + response.serviceId = response.topologyVersion.processId; + } + if (!response.serviceId) { + return callback( + new MongoDriverError( + 'Driver attempted to initialize in load balancing mode, ' + + 'but the server does not support this mode.' + ) + ); + } + } + // NOTE: This is metadata attached to the connection while porting away from // handshake being done in the `Server` class. Likely, it should be // relocated, or at very least restructured. @@ -172,6 +191,7 @@ export interface HandshakeDocument extends Document { client: ClientMetadata; compression: string[]; saslSupportedMechs?: string; + loadBalanced: boolean; } function prepareHandshakeDocument(authContext: AuthContext, callback: Callback) { @@ -183,7 +203,8 @@ function prepareHandshakeDocument(authContext: AuthContext, callback: Callback { static readonly CLOSE = 'close' as const; /** @event */ static readonly MESSAGE = 'message' as const; + /** @event */ + static readonly PINNED = 'pinned' as const; + /** @event */ + static readonly UNPINNED = 'unpinned' as const; constructor(stream: Stream, options: ConnectionOptions) { super(); @@ -249,10 +252,22 @@ export class Connection extends TypedEventEmitter { this[kIsMaster] = response; } + get serviceId(): ObjectId | undefined { + return this.ismaster?.serviceId; + } + + get loadBalanced(): boolean { + return this.description.loadBalanced; + } + get generation(): number { return this[kGeneration] || 0; } + set generation(generation: number) { + this[kGeneration] = generation; + } + get idleTime(): number { return calculateDurationInMs(this[kLastUseTime]); } @@ -308,6 +323,9 @@ export class Connection extends TypedEventEmitter { options = { force: false }; } + this.removeAllListeners(Connection.PINNED); + this.removeAllListeners(Connection.UNPINNED); + options = Object.assign({ force: false }, options); if (this[kStream] == null || this.destroyed) { this.destroyed = true; @@ -354,7 +372,6 @@ export class Connection extends TypedEventEmitter { let clusterTime = this.clusterTime; let finalCmd = Object.assign({}, cmd); - const inTransaction = session && (session.inTransaction() || isTransactionCommand(finalCmd)); if (this.serverApi) { const { version, strict, deprecationErrors } = this.serverApi; @@ -372,18 +389,6 @@ export class Connection extends TypedEventEmitter { clusterTime = session.clusterTime; } - // We need to unpin any read or write commands that happen outside of a pinned - // transaction, so we check if we have a pinned transaction that is no longer - // active, and unpin for all except start or commit. - if ( - !session.transaction.isActive && - session.transaction.isPinned && - !finalCmd.startTransaction && - !finalCmd.commitTransaction - ) { - session.transaction.unpinServer(); - } - const err = applySession(session, finalCmd, options as CommandOptions); if (err) { return callback(err); @@ -419,35 +424,10 @@ export class Connection extends TypedEventEmitter { ? new Msg(cmdNs, finalCmd, commandOptions) : new Query(cmdNs, finalCmd, commandOptions); - const commandResponseHandler = inTransaction - ? (err?: AnyError, ...args: Document[]) => { - // We need to add a TransientTransactionError errorLabel, as stated in the transaction spec. - if ( - err && - err instanceof MongoNetworkError && - !err.hasErrorLabel('TransientTransactionError') - ) { - err.addErrorLabel('TransientTransactionError'); - } - - if ( - session && - !cmd.commitTransaction && - err && - err instanceof MongoError && - err.hasErrorLabel('TransientTransactionError') - ) { - session.transaction.unpinServer(); - } - - return callback(err, ...args); - } - : callback; - try { - write(this, message, commandOptions, commandResponseHandler); + write(this, message, commandOptions, callback); } catch (err) { - commandResponseHandler(err); + callback(err); } } @@ -687,8 +667,10 @@ export class CryptoConnection extends Connection { } } -function hasSessionSupport(conn: Connection) { - return conn.description.logicalSessionTimeoutMinutes != null; +/** @internal */ +export function hasSessionSupport(conn: Connection): boolean { + const description = conn.description; + return description.logicalSessionTimeoutMinutes != null || !!description.loadBalanced; } function supportsOpMsg(conn: Connection) { diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 3961f479bc..d63c10d8b9 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -1,6 +1,8 @@ import Denque = require('denque'); -import { Logger } from '../logger'; import { APM_EVENTS, Connection, ConnectionEvents, ConnectionOptions } from './connection'; +import type { ObjectId } from 'bson'; +import { Logger } from '../logger'; +import { ConnectionPoolMetrics } from './metrics'; import { connect } from './connect'; import { eachAsync, makeCounter, Callback } from '../utils'; import { MongoDriverError, MongoError, MongoInvalidArgumentError } from '../error'; @@ -30,6 +32,8 @@ const kMinPoolSizeTimer = Symbol('minPoolSizeTimer'); /** @internal */ const kGeneration = Symbol('generation'); /** @internal */ +const kServiceGenerations = Symbol('serviceGenerations'); +/** @internal */ const kConnectionCounter = Symbol('connectionCounter'); /** @internal */ const kCancellationToken = Symbol('cancellationToken'); @@ -37,6 +41,12 @@ const kCancellationToken = Symbol('cancellationToken'); const kWaitQueue = Symbol('waitQueue'); /** @internal */ const kCancelled = Symbol('cancelled'); +/** @internal */ +const kMetrics = Symbol('metrics'); +/** @internal */ +const kCheckedOut = Symbol('checkedOut'); +/** @internal */ +const kProcessingWaitQueue = Symbol('processingWaitQueue'); /** @public */ export interface ConnectionPoolOptions extends Omit { @@ -48,6 +58,8 @@ export interface ConnectionPoolOptions extends Omit { * @internal */ [kGeneration]: number; + /** A map of generations to service ids + * @internal + */ + [kServiceGenerations]: Map; /** @internal */ [kConnectionCounter]: Generator; /** @internal */ [kCancellationToken]: CancellationToken; /** @internal */ [kWaitQueue]: Denque; + /** @internal */ + [kMetrics]: ConnectionPoolMetrics; + /** @internal */ + [kCheckedOut]: number; + /** @internal */ + [kProcessingWaitQueue]: boolean; /** * Emitted when the connection pool is created. @@ -184,10 +206,14 @@ export class ConnectionPool extends TypedEventEmitter { this[kPermits] = this.options.maxPoolSize; this[kMinPoolSizeTimer] = undefined; this[kGeneration] = 0; + this[kServiceGenerations] = new Map(); this[kConnectionCounter] = makeCounter(1); this[kCancellationToken] = new CancellationToken(); this[kCancellationToken].setMaxListeners(Infinity); this[kWaitQueue] = new Denque(); + this[kMetrics] = new ConnectionPoolMetrics(); + this[kCheckedOut] = 0; + this[kProcessingWaitQueue] = false; process.nextTick(() => { this.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this)); @@ -219,6 +245,25 @@ export class ConnectionPool extends TypedEventEmitter { return this[kWaitQueue].length; } + get loadBalanced(): boolean { + return this.options.loadBalanced; + } + + get serviceGenerations(): Map { + return this[kServiceGenerations]; + } + + get currentCheckedOutCount(): number { + return this[kCheckedOut]; + } + + /** + * Get the metrics information for the pool when a wait queue timeout occurs. + */ + private waitQueueErrorMetrics(): string { + return this[kMetrics].info(this.options.maxPoolSize); + } + /** * Check a connection out of this pool. The connection will continue to be tracked, but no reference to it * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or @@ -250,10 +295,18 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new ConnectionCheckOutFailedEvent(this, 'timeout') ); - waitQueueMember.callback(new WaitQueueTimeoutError(this)); + waitQueueMember.callback( + new WaitQueueTimeoutError( + this.loadBalanced + ? this.waitQueueErrorMetrics() + : 'Timed out while checking out a connection from connection pool', + this.address + ) + ); }, waitQueueTimeoutMS); } + this[kCheckedOut] = this[kCheckedOut] + 1; this[kWaitQueue].push(waitQueueMember); process.nextTick(processWaitQueue, this); } @@ -270,9 +323,10 @@ export class ConnectionPool extends TypedEventEmitter { if (!willDestroy) { connection.markAvailable(); - this[kConnections].push(connection); + this[kConnections].unshift(connection); } + this[kCheckedOut] = this[kCheckedOut] - 1; this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new ConnectionCheckedInEvent(this, connection)); if (willDestroy) { @@ -289,9 +343,23 @@ export class ConnectionPool extends TypedEventEmitter { * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a * previous generation will eventually be pruned during subsequent checkouts. */ - clear(): void { - this[kGeneration] += 1; - this.emit('connectionPoolCleared', new ConnectionPoolClearedEvent(this)); + clear(serviceId?: ObjectId): void { + if (this.loadBalanced && serviceId) { + const sid = serviceId.toHexString(); + const generation = this.serviceGenerations.get(sid); + // Only need to worry if the generation exists, since it should + // always be there but typescript needs the check. + if (generation == null) { + throw new MongoDriverError('Service generations are required in load balancer mode.'); + } else { + // Increment the generation for the service id. + this.serviceGenerations.set(sid, generation + 1); + } + } else { + this[kGeneration] += 1; + } + + this.emit('connectionPoolCleared', new ConnectionPoolClearedEvent(this, serviceId)); } /** Close the pool */ @@ -338,7 +406,6 @@ export class ConnectionPool extends TypedEventEmitter { // mark the pool as closed immediately this.closed = true; - eachAsync( this[kConnections].toArray(), (conn, cb) => { @@ -362,10 +429,34 @@ export class ConnectionPool extends TypedEventEmitter { * * NOTE: please note the required signature of `fn` * + * @remarks When in load balancer mode, connections can be pinned to cursors or transactions. + * In these cases we pass the connection in to this method to ensure it is used and a new + * connection is not checked out. + * + * @param conn - A pinned connection for use in load balancing mode. * @param fn - A function which operates on a managed connection * @param callback - The original callback */ - withConnection(fn: WithConnectionCallback, callback?: Callback): void { + withConnection( + conn: Connection | undefined, + fn: WithConnectionCallback, + callback?: Callback + ): void { + if (conn) { + // use the provided connection, and do _not_ check it in after execution + fn(undefined, conn, (fnErr, result) => { + if (typeof callback === 'function') { + if (fnErr) { + callback(fnErr); + } else { + callback(undefined, result); + } + } + }); + + return; + } + this.checkOut((err, conn) => { // don't callback with `err` here, we might want to act upon it inside `fn` fn(err as MongoError, conn, (fnErr, result) => { @@ -399,6 +490,13 @@ function ensureMinPoolSize(pool: ConnectionPool) { } function connectionIsStale(pool: ConnectionPool, connection: Connection) { + const serviceId = connection.serviceId; + if (pool.loadBalanced && serviceId) { + const sid = serviceId.toHexString(); + const generation = pool.serviceGenerations.get(sid); + return connection.generation !== generation; + } + return connection.generation !== pool[kGeneration]; } @@ -437,7 +535,24 @@ function createConnection(pool: ConnectionPool, callback?: Callback) connection.on(event, (e: any) => pool.emit(event, e)); } - pool.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionCreatedEvent(pool, connection)); + pool.emit(ConnectionPool.CONNECTION_CREATED, new ConnectionCreatedEvent(pool, connection)); + + if (pool.loadBalanced) { + connection.on(Connection.PINNED, pinType => pool[kMetrics].markPinned(pinType)); + connection.on(Connection.UNPINNED, pinType => pool[kMetrics].markUnpinned(pinType)); + + const serviceId = connection.serviceId; + if (serviceId) { + let generation; + const sid = serviceId.toHexString(); + if ((generation = pool.serviceGenerations.get(sid))) { + connection.generation = generation; + } else { + pool.serviceGenerations.set(sid, 0); + connection.generation = 0; + } + } + } connection.markAvailable(); pool.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(pool, connection)); @@ -465,10 +580,11 @@ function destroyConnection(pool: ConnectionPool, connection: Connection, reason: } function processWaitQueue(pool: ConnectionPool) { - if (pool.closed) { + if (pool.closed || pool[kProcessingWaitQueue]) { return; } + pool[kProcessingWaitQueue] = true; while (pool.waitQueueSize) { const waitQueueMember = pool[kWaitQueue].peekFront(); if (!waitQueueMember) { @@ -502,11 +618,11 @@ function processWaitQueue(pool: ConnectionPool) { } pool[kWaitQueue].shift(); - return waitQueueMember.callback(undefined, connection); + waitQueueMember.callback(undefined, connection); + } else { + const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle'; + destroyConnection(pool, connection, reason); } - - const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle'; - destroyConnection(pool, connection, reason); } const maxPoolSize = pool.options.maxPoolSize; @@ -518,6 +634,7 @@ function processWaitQueue(pool: ConnectionPool) { pool[kConnections].push(connection); } + pool[kProcessingWaitQueue] = false; return; } @@ -537,9 +654,11 @@ function processWaitQueue(pool: ConnectionPool) { clearTimeout(waitQueueMember.timer); } waitQueueMember.callback(err, connection); + pool[kProcessingWaitQueue] = false; + process.nextTick(() => processWaitQueue(pool)); }); - - return; + } else { + pool[kProcessingWaitQueue] = false; } } @@ -565,7 +684,7 @@ export const CMAP_EVENTS = [ * @param callback - A function to call back after connection management is complete */ export type WithConnectionCallback = ( - error: MongoError, + error: MongoError | undefined, connection: Connection | undefined, callback: Callback ) => void; diff --git a/src/cmap/connection_pool_events.ts b/src/cmap/connection_pool_events.ts index 757d213c57..8083f6d2cd 100644 --- a/src/cmap/connection_pool_events.ts +++ b/src/cmap/connection_pool_events.ts @@ -1,3 +1,4 @@ +import type { ObjectId } from '../bson'; import type { Connection } from './connection'; import type { ConnectionPool, ConnectionPoolOptions } from './connection_pool'; import type { AnyError } from '../error'; @@ -90,12 +91,14 @@ export class ConnectionClosedEvent extends ConnectionPoolMonitoringEvent { connectionId: number | ''; /** The reason the connection was closed */ reason: string; + serviceId?: ObjectId; /** @internal */ constructor(pool: ConnectionPool, connection: Connection, reason: string) { super(pool); this.connectionId = connection.id; this.reason = reason || 'unknown'; + this.serviceId = connection.serviceId; } } @@ -166,7 +169,11 @@ export class ConnectionCheckedInEvent extends ConnectionPoolMonitoringEvent { */ export class ConnectionPoolClearedEvent extends ConnectionPoolMonitoringEvent { /** @internal */ - constructor(pool: ConnectionPool) { + serviceId?: ObjectId; + + /** @internal */ + constructor(pool: ConnectionPool, serviceId?: ObjectId) { super(pool); + this.serviceId = serviceId; } } diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index d0a0463874..7d0319829f 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -1,5 +1,4 @@ import { MongoDriverError } from '../error'; -import type { Connection } from './connection'; import type { ConnectionPool } from './connection_pool'; /** @@ -28,9 +27,9 @@ export class WaitQueueTimeoutError extends MongoDriverError { /** The address of the connection pool */ address: string; - constructor(pool: Connection | ConnectionPool) { - super('Timed out while checking out a connection from connection pool'); - this.address = pool.address; + constructor(message: string, address: string) { + super(message); + this.address = address; } get name(): string { diff --git a/src/cmap/metrics.ts b/src/cmap/metrics.ts new file mode 100644 index 0000000000..b825b53936 --- /dev/null +++ b/src/cmap/metrics.ts @@ -0,0 +1,58 @@ +/** @internal */ +export class ConnectionPoolMetrics { + static readonly TXN = 'txn' as const; + static readonly CURSOR = 'cursor' as const; + static readonly OTHER = 'other' as const; + + txnConnections = 0; + cursorConnections = 0; + otherConnections = 0; + + /** + * Mark a connection as pinned for a specific operation. + */ + markPinned(pinType: string): void { + if (pinType === ConnectionPoolMetrics.TXN) { + this.txnConnections += 1; + } else if (pinType === ConnectionPoolMetrics.CURSOR) { + this.cursorConnections += 1; + } else { + this.otherConnections += 1; + } + } + + /** + * Unmark a connection as pinned for an operation. + */ + markUnpinned(pinType: string): void { + if (pinType === ConnectionPoolMetrics.TXN) { + this.txnConnections -= 1; + } else if (pinType === ConnectionPoolMetrics.CURSOR) { + this.cursorConnections -= 1; + } else { + this.otherConnections -= 1; + } + } + + /** + * Return information about the cmap metrics as a string. + */ + info(maxPoolSize: number): string { + return ( + 'Timed out while checking out a connection from connection pool: ' + + `maxPoolSize: ${maxPoolSize}, ` + + `connections in use by cursors: ${this.cursorConnections}, ` + + `connections in use by transactions: ${this.txnConnections}, ` + + `connections in use by other operations: ${this.otherConnections}` + ); + } + + /** + * Reset the metrics to the initial values. + */ + reset(): void { + this.txnConnections = 0; + this.cursorConnections = 0; + this.otherConnections = 0; + } +} diff --git a/src/cmap/stream_description.ts b/src/cmap/stream_description.ts index 5109b7e35d..fd0ce6102d 100644 --- a/src/cmap/stream_description.ts +++ b/src/cmap/stream_description.ts @@ -15,6 +15,8 @@ const RESPONSE_FIELDS = [ /** @public */ export interface StreamDescriptionOptions { compressors?: CompressorName[]; + logicalSessionTimeoutMinutes?: number; + loadBalanced: boolean; } /** @public */ @@ -29,6 +31,7 @@ export class StreamDescription { compressors: CompressorName[]; compressor?: CompressorName; logicalSessionTimeoutMinutes?: number; + loadBalanced: boolean; __nodejs_mock_server__?: boolean; @@ -42,6 +45,8 @@ export class StreamDescription { this.maxBsonObjectSize = 16777216; this.maxMessageSizeBytes = 48000000; this.maxWriteBatchSize = 100000; + this.logicalSessionTimeoutMinutes = options?.logicalSessionTimeoutMinutes; + this.loadBalanced = !!options?.loadBalanced; this.compressors = options && options.compressors && Array.isArray(options.compressors) ? options.compressors diff --git a/src/connection_string.ts b/src/connection_string.ts index 76f2775da0..dc661b4fae 100644 --- a/src/connection_string.ts +++ b/src/connection_string.ts @@ -33,6 +33,13 @@ import { Logger, LoggerLevel } from './logger'; import { PromiseProvider } from './promise_provider'; import { Encrypter } from './encrypter'; +const VALID_TXT_RECORDS = ['authSource', 'replicaSet', 'loadBalanced']; + +const LB_SINGLE_HOST_ERROR = 'loadBalanced option only supported with a single host in the URI'; +const LB_REPLICA_SET_ERROR = 'loadBalanced option not supported with a replicaSet option'; +const LB_DIRECT_CONNECTION_ERROR = + 'loadBalanced option not supported when directConnection is provided'; + /** * Determines whether a provided address matches the provided parent domain in order * to avoid certain attack vectors. @@ -85,6 +92,11 @@ export function resolveSRVRecord(options: MongoOptions, callback: Callback { if (err) { @@ -98,14 +110,15 @@ export function resolveSRVRecord(options: MongoOptions, callback: Callback key !== 'authSource' && key !== 'replicaSet')) { + if (txtRecordOptionKeys.some(key => !VALID_TXT_RECORDS.includes(key))) { return callback( - new MongoParseError('Text record must only set `authSource` or `replicaSet`') + new MongoParseError(`Text record may only set any of: ${VALID_TXT_RECORDS.join(', ')}`) ); } const source = txtRecordOptions.get('authSource') ?? undefined; const replicaSet = txtRecordOptions.get('replicaSet') ?? undefined; + const loadBalanced = txtRecordOptions.get('loadBalanced') ?? undefined; if (source === '' || replicaSet === '') { return callback(new MongoParseError('Cannot have empty URI params in DNS TXT Record')); @@ -118,6 +131,15 @@ export function resolveSRVRecord(options: MongoOptions, callback: Callback v != null) ); + if (objectOptions.has('loadBalanced')) { + throw new MongoParseError('loadBalanced is only a valid option in the URI'); + } + const allOptions = new CaseInsensitiveMap(); const allKeys = new Set([ @@ -378,6 +404,11 @@ export function parseOptions( throw new MongoParseError('directConnection not supported with SRV URI'); } + const lbError = validateLoadBalancedOptions(hosts, mongoOptions); + if (lbError) { + throw lbError; + } + // Potential SRV Overrides mongoOptions.userSpecifiedAuthSource = objectOptions.has('authSource') || urlOptions.has('authSource'); @@ -393,6 +424,23 @@ export function parseOptions( return mongoOptions; } +function validateLoadBalancedOptions( + hosts: HostAddress[] | string[], + mongoOptions: MongoOptions +): MongoParseError | undefined { + if (mongoOptions.loadBalanced) { + if (hosts.length > 1) { + return new MongoParseError(LB_SINGLE_HOST_ERROR); + } + if (mongoOptions.replicaSet) { + return new MongoParseError(LB_REPLICA_SET_ERROR); + } + if (mongoOptions.directConnection) { + return new MongoParseError(LB_DIRECT_CONNECTION_ERROR); + } + } +} + function setOption( mongoOptions: any, key: string, @@ -670,6 +718,10 @@ export const OPTIONS = { default: 120000, type: 'uint' }, + loadBalanced: { + default: false, + type: 'boolean' + }, localThresholdMS: { default: 15, type: 'uint' diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index c58d136f2e..ab579e5194 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -1,8 +1,10 @@ import { Callback, maybePromise, MongoDBNamespace, ns } from '../utils'; import { Long, Document, BSONSerializeOptions, pluckBSONSerializeOptions } from '../bson'; -import { ClientSession } from '../sessions'; +import { ClientSession, maybeClearPinnedConnection } from '../sessions'; import { + AnyError, MongoDriverError, + MongoNetworkError, MongoInvalidArgumentError, MongoCursorExhaustedError, MongoTailableCursorError, @@ -50,6 +52,7 @@ export const CURSOR_FLAGS = [ ] as const; /** @public */ +// TODO: Remove this as the option is never used. (NODE-3489) export interface CursorCloseOptions { /** Bypass calling killCursors when closing the cursor. */ skipKillCursors?: boolean; @@ -217,6 +220,10 @@ export abstract class AbstractCursor< return this[kKilled]; } + get loadBalanced(): boolean { + return this[kTopology].loadBalanced; + } + /** Returns current buffered documents length */ bufferedCount(): number { return this[kDocuments].length; @@ -366,7 +373,7 @@ export abstract class AbstractCursor< }); } - close(): void; + close(): Promise; close(callback: Callback): void; close(options: CursorCloseOptions): Promise; close(options: CursorCloseOptions, callback: Callback): void; @@ -377,49 +384,7 @@ export abstract class AbstractCursor< const needsToEmitClosed = !this[kClosed]; this[kClosed] = true; - return maybePromise(callback, done => { - const cursorId = this[kId]; - const cursorNs = this[kNamespace]; - const server = this[kServer]; - const session = this[kSession]; - - if (cursorId == null || server == null || cursorId.isZero() || cursorNs == null) { - if (needsToEmitClosed) { - this[kId] = Long.ZERO; - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-expect-error - this.emit(AbstractCursor.CLOSE); - } - - if (session && session.owner === this) { - return session.endSession(done); - } - - return done(); - } - - this[kKilled] = true; - server.killCursors( - cursorNs, - [cursorId], - { ...pluckBSONSerializeOptions(this[kOptions]), session }, - () => { - if (session && session.owner === this) { - return session.endSession(() => { - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-expect-error - this.emit(AbstractCursor.CLOSE); - done(); - }); - } - - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-expect-error - this.emit(AbstractCursor.CLOSE); - done(); - } - ); - }); + return maybePromise(callback, done => cleanupCursor(this, { needsToEmitClosed }, done)); } /** @@ -723,7 +688,7 @@ function next(cursor: AbstractCursor, blocking: boolean, callback: Callback callback(err, nextDocument(cursor))); + return cleanupCursor(cursor, { error: err }, () => callback(err, nextDocument(cursor))); } next(cursor, blocking, callback); @@ -733,7 +698,7 @@ function next(cursor: AbstractCursor, blocking: boolean, callback: Callback callback(undefined, null)); + return cleanupCursor(cursor, undefined, () => callback(undefined, null)); } // otherwise need to call getMore @@ -750,7 +715,7 @@ function next(cursor: AbstractCursor, blocking: boolean, callback: Callback callback(err, nextDocument(cursor))); + return cleanupCursor(cursor, { error: err }, () => callback(err, nextDocument(cursor))); } if (cursor[kDocuments].length === 0 && blocking === false) { @@ -766,18 +731,69 @@ function cursorIsDead(cursor: AbstractCursor): boolean { return !!cursorId && cursorId.isZero(); } -function cleanupCursor(cursor: AbstractCursor, callback: Callback): void { - if (cursor[kDocuments].length === 0) { - cursor[kClosed] = true; - cursor.emit(AbstractCursor.CLOSE); +function cleanupCursor( + cursor: AbstractCursor, + options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined, + callback: Callback +): void { + const cursorId = cursor[kId]; + const cursorNs = cursor[kNamespace]; + const server = cursor[kServer]; + const session = cursor[kSession]; + const error = options?.error; + const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0; + + if (error) { + if (cursor.loadBalanced && error instanceof MongoNetworkError) { + return completeCleanup(); + } } - const session = cursor[kSession]; - if (session && session.owner === cursor) { - session.endSession(callback); - } else { - callback(); + if (cursorId == null || server == null || cursorId.isZero() || cursorNs == null) { + if (needsToEmitClosed) { + cursor[kClosed] = true; + cursor[kId] = Long.ZERO; + cursor.emit(AbstractCursor.CLOSE); + } + + if (session) { + if (session.owner === cursor) { + return session.endSession({ error }, callback); + } + + if (!session.inTransaction()) { + maybeClearPinnedConnection(session, { error, force: true }); + } + } + + return callback(); } + + function completeCleanup() { + if (session) { + if (session.owner === cursor) { + return session.endSession({ error }, () => { + cursor.emit(AbstractCursor.CLOSE); + callback(); + }); + } + + if (!session.inTransaction()) { + maybeClearPinnedConnection(session, { error, force: true }); + } + } + + cursor.emit(AbstractCursor.CLOSE); + return callback(); + } + + cursor[kKilled] = true; + server.killCursors( + cursorNs, + [cursorId], + { ...pluckBSONSerializeOptions(cursor[kOptions]), session }, + () => completeCleanup() + ); } /** @internal */ @@ -835,8 +851,10 @@ function makeCursorStream(cursor: AbstractCursor extends CommandOperation { } } -defineAspects(AggregateOperation, [Aspect.READ_OPERATION, Aspect.RETRYABLE, Aspect.EXPLAINABLE]); +defineAspects(AggregateOperation, [ + Aspect.READ_OPERATION, + Aspect.RETRYABLE, + Aspect.EXPLAINABLE, + Aspect.CURSOR_CREATING +]); diff --git a/src/operations/estimated_document_count.ts b/src/operations/estimated_document_count.ts index b1fffcf2cd..90261cb308 100644 --- a/src/operations/estimated_document_count.ts +++ b/src/operations/estimated_document_count.ts @@ -68,4 +68,8 @@ export class EstimatedDocumentCountOperation extends CommandOperation { } } -defineAspects(EstimatedDocumentCountOperation, [Aspect.READ_OPERATION, Aspect.RETRYABLE]); +defineAspects(EstimatedDocumentCountOperation, [ + Aspect.READ_OPERATION, + Aspect.RETRYABLE, + Aspect.CURSOR_CREATING +]); diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 2d1b3d8959..48591a68b0 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -4,16 +4,17 @@ import { isRetryableError, MONGODB_ERROR_CODES, MongoDriverError, + MongoNetworkError, MongoCompatibilityError, MongoServerError } from '../error'; import { Aspect, AbstractOperation } from './operation'; import { maxWireVersion, maybePromise, Callback } from '../utils'; -import { ServerType } from '../sdam/common'; import type { Server } from '../sdam/server'; import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import type { Document } from '../bson'; +import { supportsRetryableWrites } from '../utils'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = @@ -140,6 +141,15 @@ function executeWithServerSelection( return; } + if ( + session && + session.isPinned && + session.transaction.isCommitted && + !operation.bypassPinningCheck + ) { + session.unpin(); + } + const serverSelectionOptions = { session }; function callbackWithRetry(err?: any, result?: any) { if (err == null) { @@ -172,16 +182,31 @@ function executeWithServerSelection( } // select a new server, and attempt to retry the operation - topology.selectServer(readPreference, serverSelectionOptions, (err?: any, server?: any) => { + topology.selectServer(readPreference, serverSelectionOptions, (e?: any, server?: any) => { if ( - err || + e || (operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) || (operation.hasAspect(Aspect.WRITE_OPERATION) && !supportsRetryableWrites(server)) ) { - callback(err); + callback(e); return; } + // If we have a cursor and the initial command fails with a network error, + // we can retry it on another connection. So we need to check it back in, clear the + // pool for the service id, and retry again. + if ( + err && + err instanceof MongoNetworkError && + server.loadBalanced && + session && + session.isPinned && + !session.inTransaction() && + operation.hasAspect(Aspect.CURSOR_CREATING) + ) { + session.unpin({ force: true, forceClear: true }); + } + operation.execute(server, session, callback); }); } @@ -243,11 +268,3 @@ function executeWithServerSelection( function shouldRetryWrite(err: any) { return err instanceof MongoError && err.hasErrorLabel('RetryableWriteError'); } - -function supportsRetryableWrites(server: Server) { - return ( - server.description.maxWireVersion >= 6 && - server.description.logicalSessionTimeoutMinutes && - server.description.type !== ServerType.Standalone - ); -} diff --git a/src/operations/find.ts b/src/operations/find.ts index 4f2b15b904..d3eed86a36 100644 --- a/src/operations/find.ts +++ b/src/operations/find.ts @@ -345,4 +345,9 @@ function makeLegacyFindCommand( return findCommand; } -defineAspects(FindOperation, [Aspect.READ_OPERATION, Aspect.RETRYABLE, Aspect.EXPLAINABLE]); +defineAspects(FindOperation, [ + Aspect.READ_OPERATION, + Aspect.RETRYABLE, + Aspect.EXPLAINABLE, + Aspect.CURSOR_CREATING +]); diff --git a/src/operations/find_one.ts b/src/operations/find_one.ts index c498fd82d2..8d28369060 100644 --- a/src/operations/find_one.ts +++ b/src/operations/find_one.ts @@ -40,4 +40,4 @@ export class FindOneOperation extends CommandOperation { } } -defineAspects(FindOneOperation, [Aspect.EXPLAINABLE]); +defineAspects(FindOneOperation, [Aspect.EXPLAINABLE, Aspect.CURSOR_CREATING]); diff --git a/src/operations/indexes.ts b/src/operations/indexes.ts index ed93aef564..6f67452550 100644 --- a/src/operations/indexes.ts +++ b/src/operations/indexes.ts @@ -513,7 +513,11 @@ export class IndexInformationOperation extends AbstractOperation { } } -defineAspects(ListIndexesOperation, [Aspect.READ_OPERATION, Aspect.RETRYABLE]); +defineAspects(ListIndexesOperation, [ + Aspect.READ_OPERATION, + Aspect.RETRYABLE, + Aspect.CURSOR_CREATING +]); defineAspects(CreateIndexesOperation, [Aspect.WRITE_OPERATION]); defineAspects(CreateIndexOperation, [Aspect.WRITE_OPERATION]); defineAspects(EnsureIndexOperation, [Aspect.WRITE_OPERATION]); diff --git a/src/operations/list_collections.ts b/src/operations/list_collections.ts index 8512adf683..c9d54e2669 100644 --- a/src/operations/list_collections.ts +++ b/src/operations/list_collections.ts @@ -152,4 +152,8 @@ export class ListCollectionsCursor< } } -defineAspects(ListCollectionsOperation, [Aspect.READ_OPERATION, Aspect.RETRYABLE]); +defineAspects(ListCollectionsOperation, [ + Aspect.READ_OPERATION, + Aspect.RETRYABLE, + Aspect.CURSOR_CREATING +]); diff --git a/src/operations/operation.ts b/src/operations/operation.ts index c9259857e3..6135ab44e6 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -9,7 +9,8 @@ export const Aspect = { WRITE_OPERATION: Symbol('WRITE_OPERATION'), RETRYABLE: Symbol('RETRYABLE'), EXPLAINABLE: Symbol('EXPLAINABLE'), - SKIP_COLLATION: Symbol('SKIP_COLLATION') + SKIP_COLLATION: Symbol('SKIP_COLLATION'), + CURSOR_CREATING: Symbol('CURSOR_CREATING') } as const; /** @public */ @@ -27,6 +28,9 @@ export interface OperationOptions extends BSONSerializeOptions { /** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */ readPreference?: ReadPreferenceLike; + + /** @internal Hints to `executeOperation` that this operation should not unpin on an ended transaction */ + bypassPinningCheck?: boolean; } /** @internal */ @@ -45,6 +49,7 @@ export abstract class AbstractOperation { readPreference: ReadPreference; server!: Server; fullResponse?: boolean; + bypassPinningCheck: boolean; // BSON serialization options bsonOptions?: BSONSerializeOptions; @@ -67,6 +72,7 @@ export abstract class AbstractOperation { } this.options = options; + this.bypassPinningCheck = !!options.bypassPinningCheck; } abstract execute(server: Server, session: ClientSession, callback: Callback): void; diff --git a/src/sdam/common.ts b/src/sdam/common.ts index d3ae9e620d..673573b258 100644 --- a/src/sdam/common.ts +++ b/src/sdam/common.ts @@ -17,7 +17,8 @@ export const TopologyType = Object.freeze({ ReplicaSetNoPrimary: 'ReplicaSetNoPrimary', ReplicaSetWithPrimary: 'ReplicaSetWithPrimary', Sharded: 'Sharded', - Unknown: 'Unknown' + Unknown: 'Unknown', + LoadBalanced: 'LoadBalanced' } as const); /** @public */ @@ -36,7 +37,8 @@ export const ServerType = Object.freeze({ RSArbiter: 'RSArbiter', RSOther: 'RSOther', RSGhost: 'RSGhost', - Unknown: 'Unknown' + Unknown: 'Unknown', + LoadBalancer: 'LoadBalancer' } as const); /** @public */ diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 71d8a266e0..3900483aeb 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -18,12 +18,12 @@ import { EventEmitterWithState } from '../utils'; import { - ServerType, STATE_CLOSED, STATE_CLOSING, STATE_CONNECTING, STATE_CONNECTED, - ClusterTime + ClusterTime, + TopologyType } from './common'; import { MongoError, @@ -56,6 +56,7 @@ import type { Document, Long } from '../bson'; import type { AutoEncrypter } from '../deps'; import type { ServerApi } from '../mongo_client'; import { TypedEventEmitter } from '../mongo_types'; +import { supportsRetryableWrites } from '../utils'; const stateTransition = makeStateMachine({ [STATE_CLOSED]: [STATE_CLOSED, STATE_CONNECTING], @@ -153,6 +154,9 @@ export class Server extends TypedEventEmitter { this.clusterTime = clusterTime; }); + // monitoring is disabled in load balancing mode + if (this.loadBalanced) return; + // create the monitor this[kMonitor] = new Monitor(this, this.s.options); @@ -194,6 +198,10 @@ export class Server extends TypedEventEmitter { } } + get loadBalanced(): boolean { + return this.s.topology.description.type === TopologyType.LoadBalanced; + } + /** * Initiate server connect */ @@ -203,7 +211,16 @@ export class Server extends TypedEventEmitter { } stateTransition(this, STATE_CONNECTING); - this[kMonitor].connect(); + + // If in load balancer mode we automatically set the server to + // a load balancer. It never transitions out of this state and + // has no monitor. + if (!this.loadBalanced) { + this[kMonitor].connect(); + } else { + stateTransition(this, STATE_CONNECTED); + this.emit(Server.CONNECT, this); + } } /** Destroy the server connection */ @@ -221,7 +238,10 @@ export class Server extends TypedEventEmitter { stateTransition(this, STATE_CLOSING); - this[kMonitor].close(); + if (!this.loadBalanced) { + this[kMonitor].close(); + } + this.s.pool.close(options, err => { stateTransition(this, STATE_CLOSED); this.emit('closed'); @@ -236,7 +256,9 @@ export class Server extends TypedEventEmitter { * this will be a no-op. */ requestCheck(): void { - this[kMonitor].requestCheck(); + if (!this.loadBalanced) { + this[kMonitor].requestCheck(); + } } /** @@ -284,19 +306,43 @@ export class Server extends TypedEventEmitter { return; } - this.s.pool.withConnection((err, conn, cb) => { - if (err || !conn) { - markServerUnknown(this, err); - return cb(err); - } + const session = finalOptions.session; + const conn = session?.pinnedConnection; + + // NOTE: This is a hack! We can't retrieve the connections used for executing an operation + // (and prevent them from being checked back in) at the point of operation execution. + // This should be considered as part of the work for NODE-2882 + if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) { + this.s.pool.checkOut((err, checkedOut) => { + if (err || checkedOut == null) { + if (callback) return callback(err); + return; + } - conn.command( - ns, - cmd, - finalOptions, - makeOperationHandler(this, conn, cmd, finalOptions, cb) as Callback - ); - }, callback); + session.pin(checkedOut); + this.command(ns, cmd, finalOptions, callback as Callback); + }); + + return; + } + + this.s.pool.withConnection( + conn, + (err, conn, cb) => { + if (err || !conn) { + markServerUnknown(this, err); + return cb(err); + } + + conn.command( + ns, + cmd, + finalOptions, + makeOperationHandler(this, conn, cmd, finalOptions, cb) as Callback + ); + }, + callback + ); } /** @@ -309,14 +355,23 @@ export class Server extends TypedEventEmitter { return; } - this.s.pool.withConnection((err, conn, cb) => { - if (err || !conn) { - markServerUnknown(this, err); - return cb(err); - } + this.s.pool.withConnection( + undefined, + (err, conn, cb) => { + if (err || !conn) { + markServerUnknown(this, err); + return cb(err); + } - conn.query(ns, cmd, options, makeOperationHandler(this, conn, cmd, options, cb) as Callback); - }, callback); + conn.query( + ns, + cmd, + options, + makeOperationHandler(this, conn, cmd, options, cb) as Callback + ); + }, + callback + ); } /** @@ -334,19 +389,23 @@ export class Server extends TypedEventEmitter { return; } - this.s.pool.withConnection((err, conn, cb) => { - if (err || !conn) { - markServerUnknown(this, err); - return cb(err); - } + this.s.pool.withConnection( + options.session?.pinnedConnection, + (err, conn, cb) => { + if (err || !conn) { + markServerUnknown(this, err); + return cb(err); + } - conn.getMore( - ns, - cursorId, - options, - makeOperationHandler(this, conn, {}, options, cb) as Callback - ); - }, callback); + conn.getMore( + ns, + cursorId, + options, + makeOperationHandler(this, conn, {}, options, cb) as Callback + ); + }, + callback + ); } /** @@ -367,19 +426,23 @@ export class Server extends TypedEventEmitter { return; } - this.s.pool.withConnection((err, conn, cb) => { - if (err || !conn) { - markServerUnknown(this, err); - return cb(err); - } + this.s.pool.withConnection( + options.session?.pinnedConnection, + (err, conn, cb) => { + if (err || !conn) { + markServerUnknown(this, err); + return cb(err); + } - conn.killCursors( - ns, - cursorIds, - options, - makeOperationHandler(this, conn, {}, undefined, cb) as Callback - ); - }, callback); + conn.killCursors( + ns, + cursorIds, + options, + makeOperationHandler(this, conn, {}, undefined, cb) as Callback + ); + }, + callback + ); } } @@ -398,14 +461,6 @@ Object.defineProperty(Server.prototype, 'clusterTime', { } }); -function supportsRetryableWrites(server: Server) { - return ( - server.description.maxWireVersion >= 6 && - server.description.logicalSessionTimeoutMinutes && - server.description.type !== ServerType.Standalone - ); -} - function calculateRoundTripTime(oldRtt: number, duration: number): number { if (oldRtt === -1) { return duration; @@ -416,6 +471,11 @@ function calculateRoundTripTime(oldRtt: number, duration: number): number { } function markServerUnknown(server: Server, error?: MongoError) { + // Load balancer servers can never be marked unknown. + if (server.loadBalanced) { + return; + } + if (error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError)) { server[kMonitor].reset(); } @@ -430,7 +490,28 @@ function markServerUnknown(server: Server, error?: MongoError) { ); } +function isPinnableCommand(cmd: Document, session?: ClientSession): boolean { + if (session) { + return ( + session.inTransaction() || + 'aggregate' in cmd || + 'find' in cmd || + 'getMore' in cmd || + 'listCollections' in cmd || + 'listIndexes' in cmd + ); + } + + return false; +} + function connectionIsStale(pool: ConnectionPool, connection: Connection) { + if (connection.serviceId) { + return ( + connection.generation !== pool.serviceGenerations.get(connection.serviceId.toHexString()) + ); + } + return connection.generation !== pool.generation; } @@ -465,6 +546,11 @@ function makeOperationHandler( session.serverSession.isDirty = true; } + // inActiveTransaction check handles commit and abort. + if (inActiveTransaction(session, cmd) && !err.hasErrorLabel('TransientTransactionError')) { + err.addErrorLabel('TransientTransactionError'); + } + if ( (isRetryableWritesEnabled(server.s.topology) || isTransactionCommand(cmd)) && supportsRetryableWrites(server) && @@ -474,8 +560,13 @@ function makeOperationHandler( } if (!(err instanceof MongoNetworkTimeoutError) || isNetworkErrorBeforeHandshake(err)) { - markServerUnknown(server, err); - server.s.pool.clear(); + // In load balanced mode we never mark the server as unknown and always + // clear for the specific service id. + + server.s.pool.clear(connection.serviceId); + if (!server.loadBalanced) { + markServerUnknown(server, err); + } } } else { // if pre-4.4 server, then add error label if its a retryable write error @@ -491,14 +582,20 @@ function makeOperationHandler( if (isSDAMUnrecoverableError(err)) { if (shouldHandleStateChangeError(server, err)) { if (maxWireVersion(server) <= 7 || isNodeShuttingDownError(err)) { - server.s.pool.clear(); + server.s.pool.clear(connection.serviceId); } - markServerUnknown(server, err); - process.nextTick(() => server.requestCheck()); + if (!server.loadBalanced) { + markServerUnknown(server, err); + process.nextTick(() => server.requestCheck()); + } } } } + + if (session && session.isPinned && err.hasErrorLabel('TransientTransactionError')) { + session.unpin({ force: true }); + } } callback(err, result); diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index 76f8ccff08..bd907b2c4b 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -7,14 +7,16 @@ import type { MongoError } from '../error'; const WRITABLE_SERVER_TYPES = new Set([ ServerType.RSPrimary, ServerType.Standalone, - ServerType.Mongos + ServerType.Mongos, + ServerType.LoadBalancer ]); const DATA_BEARING_SERVER_TYPES = new Set([ ServerType.RSPrimary, ServerType.RSSecondary, ServerType.Mongos, - ServerType.Standalone + ServerType.Standalone, + ServerType.LoadBalancer ]); /** @public */ @@ -36,6 +38,9 @@ export interface ServerDescriptionOptions { /** The topologyVersion */ topologyVersion?: TopologyVersion; + + /** If the client is in load balancing mode. */ + loadBalanced?: boolean; } /** @@ -90,7 +95,7 @@ export class ServerDescription { this._hostAddress = address; this.address = this._hostAddress.toString(); } - this.type = parseServerType(ismaster); + this.type = parseServerType(ismaster, options); this.hosts = ismaster?.hosts?.map((host: string) => host.toLowerCase()) ?? []; this.passives = ismaster?.passives?.map((host: string) => host.toLowerCase()) ?? []; this.arbiters = ismaster?.arbiters?.map((host: string) => host.toLowerCase()) ?? []; @@ -206,7 +211,14 @@ export class ServerDescription { } // Parses an `ismaster` message and determines the server type -export function parseServerType(ismaster?: Document): ServerType { +export function parseServerType( + ismaster?: Document, + options?: ServerDescriptionOptions +): ServerType { + if (options?.loadBalanced) { + return ServerType.LoadBalancer; + } + if (!ismaster || !ismaster.ok) { return ServerType.Unknown; } diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 0b8883b489..390da9693d 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -207,6 +207,10 @@ function knownFilter(server: ServerDescription): boolean { return server.type !== ServerType.Unknown; } +function loadBalancerFilter(server: ServerDescription): boolean { + return server.type === ServerType.LoadBalancer; +} + /** * Returns a function which selects servers based on a provided read preference * @@ -232,6 +236,10 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se ); } + if (topologyDescription.type === TopologyType.LoadBalanced) { + return servers.filter(loadBalancerFilter); + } + if (topologyDescription.type === TopologyType.Unknown) { return []; } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 602e206495..48ee8e4f5d 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -146,6 +146,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions { srvPoller?: SrvPoller; /** Indicates that a client should directly connect to a node without attempting to discover its topology type */ directConnection: boolean; + loadBalanced: boolean; metadata: ClientMetadata; /** MongoDB server API version */ serverApi?: ServerApi; @@ -248,6 +249,7 @@ export class Topology extends TypedEventEmitter { retryWrites: DEFAULT_OPTIONS.get('retryWrites'), serverSelectionTimeoutMS: DEFAULT_OPTIONS.get('serverSelectionTimeoutMS'), directConnection: DEFAULT_OPTIONS.get('directConnection'), + loadBalanced: DEFAULT_OPTIONS.get('loadBalanced'), metadata: DEFAULT_OPTIONS.get('metadata'), monitorCommands: DEFAULT_OPTIONS.get('monitorCommands'), tls: DEFAULT_OPTIONS.get('tls'), @@ -326,7 +328,7 @@ export class Topology extends TypedEventEmitter { detectSrvRecords: ev => this.detectSrvRecords(ev) }; - if (options.srvHost) { + if (options.srvHost && !options.loadBalanced) { this.s.srvPoller = options.srvPoller ?? new SrvPoller({ @@ -380,6 +382,10 @@ export class Topology extends TypedEventEmitter { return this.s.description; } + get loadBalanced(): boolean { + return this.s.options.loadBalanced; + } + get capabilities(): ServerCapabilities { return new ServerCapabilities(this.lastIsMaster()); } @@ -412,7 +418,19 @@ export class Topology extends TypedEventEmitter { ); // connect all known servers, then attempt server selection to connect - connectServers(this, Array.from(this.s.description.servers.values())); + const serverDescriptions = Array.from(this.s.description.servers.values()); + connectServers(this, serverDescriptions); + + // In load balancer mode we need to fake a server description getting + // emitted from the monitor, since the monitor doesn't exist. + if (this.s.options.loadBalanced) { + for (const description of serverDescriptions) { + const newDescription = new ServerDescription(description.hostAddress, undefined, { + loadBalanced: this.s.options.loadBalanced + }); + this.serverUpdateHandler(newDescription); + } + } const readPreference = options.readPreference ?? ReadPreference.primary; this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => { @@ -483,28 +501,30 @@ export class Topology extends TypedEventEmitter { this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology); - for (const session of this.s.sessions) { - session.endSession(); - } - - this.s.sessionPool.endAllPooledSessions(() => { - eachAsync( - Array.from(this.s.servers.values()), - (server, cb) => destroyServer(server, this, options, cb), - err => { - this.s.servers.clear(); - - // emit an event for close - this.emit(Topology.TOPOLOGY_CLOSED, new TopologyClosedEvent(this.s.id)); - - stateTransition(this, STATE_CLOSED); - - if (typeof callback === 'function') { - callback(err); - } - } - ); - }); + eachAsync( + Array.from(this.s.sessions.values()), + (session, cb) => session.endSession(cb), + () => { + this.s.sessionPool.endAllPooledSessions(() => { + eachAsync( + Array.from(this.s.servers.values()), + (server, cb) => destroyServer(server, this, options, cb), + err => { + this.s.servers.clear(); + + // emit an event for close + this.emit(Topology.TOPOLOGY_CLOSED, new TopologyClosedEvent(this.s.id)); + + stateTransition(this, STATE_CLOSED); + + if (typeof callback === 'function') { + callback(err); + } + } + ); + }); + } + ); } /** @@ -611,7 +631,7 @@ export class Topology extends TypedEventEmitter { * @returns Whether sessions are supported on the current topology */ hasSessionSupport(): boolean { - return this.description.logicalSessionTimeoutMinutes != null; + return this.loadBalanced || this.description.logicalSessionTimeoutMinutes != null; } /** Start a logical session */ @@ -831,6 +851,10 @@ function topologyTypeFromOptions(options?: TopologyOptions) { return TopologyType.ReplicaSetNoPrimary; } + if (options?.loadBalanced) { + return TopologyType.LoadBalanced; + } + return TopologyType.Unknown; } diff --git a/src/sdam/topology_description.ts b/src/sdam/topology_description.ts index 1dd5956780..1e2f0d5f66 100644 --- a/src/sdam/topology_description.ts +++ b/src/sdam/topology_description.ts @@ -85,7 +85,13 @@ export class TopologyDescription { // determine server compatibility for (const serverDescription of this.servers.values()) { - if (serverDescription.type === ServerType.Unknown) continue; + // Load balancer mode is always compatible. + if ( + serverDescription.type === ServerType.Unknown || + serverDescription.type === ServerType.LoadBalancer + ) { + continue; + } if (serverDescription.minWireVersion > MAX_SUPPORTED_WIRE_VERSION) { this.compatible = false; diff --git a/src/sessions.ts b/src/sessions.ts index bbee3db3e1..a7a4428d5d 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -2,7 +2,7 @@ import { PromiseProvider } from './promise_provider'; import { Binary, Long, Timestamp, Document } from './bson'; import { ReadPreference } from './read_preference'; import { isTransactionCommand, TxnState, Transaction, TransactionOptions } from './transactions'; -import { _advanceClusterTime, ClusterTime } from './sdam/common'; +import { _advanceClusterTime, ClusterTime, TopologyType } from './sdam/common'; import { isSharded } from './cmap/wire_protocol/shared'; import { MongoError, @@ -13,7 +13,8 @@ import { MongoWriteConcernError, MONGODB_ERROR_CODES, MongoDriverError, - MongoServerError + MongoServerError, + AnyError } from './error'; import { now, @@ -30,6 +31,8 @@ import { executeOperation } from './operations/execute_operation'; import { RunAdminCommandOperation } from './operations/run_command'; import type { AbstractCursor } from './cursor/abstract_cursor'; import type { CommandOptions } from './cmap/connection'; +import { Connection } from './cmap/connection'; +import { ConnectionPoolMetrics } from './cmap/metrics'; import type { WriteConcern } from './write_concern'; import { TypedEventEmitter } from './mongo_types'; import { ReadConcernLevel } from './read_concern'; @@ -81,6 +84,19 @@ const kServerSession = Symbol('serverSession'); const kSnapshotTime = Symbol('snapshotTime'); /** @internal */ const kSnapshotEnabled = Symbol('snapshotEnabled'); +/** @internal */ +const kPinnedConnection = Symbol('pinnedConnection'); + +/** @public */ +export interface EndSessionOptions { + /** + * An optional error which caused the call to end this session + * @internal + */ + error?: AnyError; + force?: boolean; + forceClear?: boolean; +} /** * A class representing a client session on the server @@ -109,6 +125,8 @@ export class ClientSession extends TypedEventEmitter { [kSnapshotTime]?: Timestamp; /** @internal */ [kSnapshotEnabled] = false; + /** @internal */ + [kPinnedConnection]?: Connection; /** * Create a client session. @@ -185,6 +203,41 @@ export class ClientSession extends TypedEventEmitter { return this[kSnapshotEnabled]; } + get loadBalanced(): boolean { + return this.topology.description.type === TopologyType.LoadBalanced; + } + + /** @internal */ + get pinnedConnection(): Connection | undefined { + return this[kPinnedConnection]; + } + + /** @internal */ + pin(conn: Connection): void { + if (this[kPinnedConnection]) { + throw TypeError('Cannot pin multiple connections to the same session'); + } + + this[kPinnedConnection] = conn; + conn.emit( + Connection.PINNED, + this.inTransaction() ? ConnectionPoolMetrics.TXN : ConnectionPoolMetrics.CURSOR + ); + } + + /** @internal */ + unpin(options?: { force?: boolean; forceClear?: boolean; error?: AnyError }): void { + if (this.loadBalanced) { + return maybeClearPinnedConnection(this, options); + } + + this.transaction.unpinServer(); + } + + get isPinned(): boolean { + return this.loadBalanced ? !!this[kPinnedConnection] : this.transaction.isPinned; + } + /** * Ends this session on the server * @@ -193,21 +246,24 @@ export class ClientSession extends TypedEventEmitter { */ endSession(): Promise; endSession(callback: Callback): void; - endSession(options: Record): Promise; - endSession(options: Record, callback: Callback): void; + endSession(options: EndSessionOptions): Promise; + endSession(options: EndSessionOptions, callback: Callback): void; endSession( - options?: Record | Callback, + options?: EndSessionOptions | Callback, callback?: Callback ): void | Promise { if (typeof options === 'function') (callback = options), (options = {}); - options = options ?? {}; + const finalOptions = { force: true, ...options }; return maybePromise(callback, done => { if (this.hasEnded) { + maybeClearPinnedConnection(this, finalOptions); return done(); } const completeEndSession = () => { + maybeClearPinnedConnection(this, finalOptions); + // release the server session back to the pool this.sessionPool.release(this.serverSession); this[kServerSession] = undefined; @@ -322,6 +378,10 @@ export class ClientSession extends TypedEventEmitter { throw new MongoDriverError('Transaction already in progress'); } + if (this.isPinned && this.transaction.isCommitted) { + this.unpin(); + } + const topologyMaxWireVersion = maxWireVersion(this.topology); if ( isSharded(this.topology) && @@ -429,6 +489,47 @@ function isUnknownTransactionCommitResult(err: MongoError) { ); } +export function maybeClearPinnedConnection( + session: ClientSession, + options?: EndSessionOptions +): void { + // unpin a connection if it has been pinned + const conn = session[kPinnedConnection]; + const error = options?.error; + + if ( + session.inTransaction() && + error && + error instanceof MongoError && + error.hasErrorLabel('TransientTransactionError') + ) { + return; + } + + // NOTE: the spec talks about what to do on a network error only, but the tests seem to + // to validate that we don't unpin on _all_ errors? + if (conn) { + const servers = Array.from(session.topology.s.servers.values()); + const loadBalancer = servers[0]; + + if (options?.error == null || options?.force) { + loadBalancer.s.pool.checkIn(conn); + conn.emit( + Connection.UNPINNED, + session.transaction.state !== TxnState.NO_TRANSACTION + ? ConnectionPoolMetrics.TXN + : ConnectionPoolMetrics.CURSOR + ); + + if (options?.forceClear) { + loadBalancer.s.pool.clear(conn.serviceId); + } + } + + session[kPinnedConnection] = undefined; + } +} + function isMaxTimeMSExpiredError(err: MongoError) { if (err == null || !(err instanceof MongoServerError)) { return false; @@ -613,6 +714,10 @@ function endTransaction(session: ClientSession, commandName: string, callback: C function commandHandler(e?: MongoError, r?: Document) { if (commandName !== 'commitTransaction') { session.transaction.transition(TxnState.TRANSACTION_ABORTED); + if (session.loadBalanced) { + maybeClearPinnedConnection(session, { force: false }); + } + // The spec indicates that we should ignore all errors on `abortTransaction` return callback(); } @@ -629,12 +734,13 @@ function endTransaction(session: ClientSession, commandName: string, callback: C e.addErrorLabel('UnknownTransactionCommitResult'); // per txns spec, must unpin session in this case - session.transaction.unpinServer(); + session.unpin({ error: e }); } } else if (e.hasErrorLabel('TransientTransactionError')) { - session.transaction.unpinServer(); + session.unpin({ error: e }); } } + callback(e, r); } @@ -648,14 +754,20 @@ function endTransaction(session: ClientSession, commandName: string, callback: C session.topology, new RunAdminCommandOperation(undefined, command, { session, - readPreference: ReadPreference.primary + readPreference: ReadPreference.primary, + bypassPinningCheck: true }), (err, reply) => { + if (command.abortTransaction) { + // always unpin on abort regardless of command outcome + session.unpin(); + } + if (err && isRetryableError(err as MongoError)) { // SPEC-1185: apply majority write concern when retrying commitTransaction if (command.commitTransaction) { // per txns spec, must unpin session in this case - session.transaction.unpinServer(); + session.unpin({ force: true }); command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, { w: 'majority' @@ -666,7 +778,8 @@ function endTransaction(session: ClientSession, commandName: string, callback: C session.topology, new RunAdminCommandOperation(undefined, command, { session, - readPreference: ReadPreference.primary + readPreference: ReadPreference.primary, + bypassPinningCheck: true }), (_err, _reply) => commandHandler(_err as MongoError, _reply) ); @@ -765,7 +878,7 @@ export class ServerSessionPool { while (this.sessions.length) { const session = this.sessions.shift(); - if (session && !session.hasTimedOut(sessionTimeoutMinutes)) { + if (session && (this.topology.loadBalanced || !session.hasTimedOut(sessionTimeoutMinutes))) { return session; } } @@ -782,6 +895,11 @@ export class ServerSessionPool { */ release(session: ServerSession): void { const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes; + + if (this.topology.loadBalanced && !sessionTimeoutMinutes) { + this.sessions.unshift(session); + } + if (!sessionTimeoutMinutes) { return; } diff --git a/src/transactions.ts b/src/transactions.ts index 95c05da415..4802e6d3bd 100644 --- a/src/transactions.ts +++ b/src/transactions.ts @@ -45,6 +45,17 @@ const stateMachine: { [state in TxnState]: TxnState[] } = { ] }; +const ACTIVE_STATES: Set = new Set([ + TxnState.STARTING_TRANSACTION, + TxnState.TRANSACTION_IN_PROGRESS +]); + +const COMMITTED_STATES: Set = new Set([ + TxnState.TRANSACTION_COMMITTED, + TxnState.TRANSACTION_COMMITTED_EMPTY, + TxnState.TRANSACTION_ABORTED +]); + /** * Configuration options for a transaction. * @public @@ -77,7 +88,6 @@ export class Transaction { /** Create a transaction @internal */ constructor(options?: TransactionOptions) { options = options ?? {}; - this.state = TxnState.NO_TRANSACTION; this.options = {}; @@ -129,13 +139,12 @@ export class Transaction { * @returns Whether this session is presently in a transaction */ get isActive(): boolean { - const activeStates: TxnState[] = [ - TxnState.STARTING_TRANSACTION, - TxnState.TRANSACTION_IN_PROGRESS - ]; - return activeStates.includes(this.state); + return ACTIVE_STATES.has(this.state); } + get isCommitted(): boolean { + return COMMITTED_STATES.has(this.state); + } /** * Transition the transaction in the state machine * @internal diff --git a/src/utils.ts b/src/utils.ts index 1af2ac2f00..748b99add2 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -12,6 +12,7 @@ import { import { WriteConcern, WriteConcernOptions, W } from './write_concern'; import type { Server } from './sdam/server'; import type { Topology } from './sdam/topology'; +import { ServerType } from './sdam/common'; import type { Db } from './db'; import type { Collection } from './collection'; import type { OperationOptions, Hint } from './operations/operation'; @@ -25,6 +26,7 @@ import type { MongoClient } from './mongo_client'; import type { CommandOperationOptions, OperationParent } from './operations/command'; import { ReadPreference } from './read_preference'; import { URL } from 'url'; +import { MAX_SUPPORTED_WIRE_VERSION } from './cmap/wire_protocol/constants'; /** * MongoDB Driver style callback @@ -674,6 +676,13 @@ export function uuidV4(): Buffer { */ export function maxWireVersion(topologyOrServer?: Connection | Topology | Server): number { if (topologyOrServer) { + if (topologyOrServer.loadBalanced) { + // Since we do not have a monitor, we assume the load balanced server is always + // pointed at the latest mongodb version. There is a risk that for on-prem + // deployments that don't upgrade immediately that this could alert to the + // application that a feature is avaiable that is actually not. + return MAX_SUPPORTED_WIRE_VERSION; + } if (topologyOrServer.ismaster) { return topologyOrServer.ismaster.maxWireVersion; } @@ -1403,3 +1412,17 @@ export function emitWarningOnce(message: string): void { export function enumToString(en: Record): string { return Object.values(en).join(', '); } + +/** + * Determine if a server supports retryable writes. + * + * @internal + */ +export function supportsRetryableWrites(server: Server): boolean { + return ( + !!server.loadBalanced || + (server.description.maxWireVersion >= 6 && + !!server.description.logicalSessionTimeoutMinutes && + server.description.type !== ServerType.Standalone) + ); +} diff --git a/test/functional/retryable_writes.test.js b/test/functional/retryable_writes.test.js index 1c79a06fcd..571bc75f32 100644 --- a/test/functional/retryable_writes.test.js +++ b/test/functional/retryable_writes.test.js @@ -99,8 +99,12 @@ function executeScenarioTest(test, ctx) { /expected false to be true/ ); if (hasResult) expect(err.result).to.matchMongoSpec(test.outcome.result); - if (errorLabelsContain) expect(err.errorLabels).to.have.members(errorLabelsContain); - if (errorLabelsOmit) expect(err.errorLabels).to.not.have.members(errorLabelsOmit); + if (errorLabelsContain) expect(err.errorLabels).to.include.members(errorLabelsContain); + if (errorLabelsOmit) { + errorLabelsOmit.forEach(label => { + expect(err.errorLabels).to.not.contain(label); + }); + } }); } else if (test.outcome.result) { const expected = test.outcome.result; diff --git a/test/functional/spec-runner/context.js b/test/functional/spec-runner/context.js index a696d5f4aa..04510637ca 100644 --- a/test/functional/spec-runner/context.js +++ b/test/functional/spec-runner/context.js @@ -124,11 +124,14 @@ class TestRunnerContext { targetedFailPoint(options) { const session = options.session; const failPoint = options.failPoint; - expect(session.transaction.isPinned).to.be.true; + expect(session.isPinned).to.be.true; return new Promise((resolve, reject) => { - const server = session.transaction.server; - server.command(ns('admin.$cmd'), failPoint, undefined, err => { + const serverOrConnection = session.loadBalanced + ? session.pinnedConnection + : session.transaction.server; + + serverOrConnection.command(ns('admin.$cmd'), failPoint, undefined, err => { if (err) return reject(err); this.appliedFailPoints.push(failPoint); diff --git a/test/functional/spec-runner/index.js b/test/functional/spec-runner/index.js index 41e929d91c..d3a9ff4507 100644 --- a/test/functional/spec-runner/index.js +++ b/test/functional/spec-runner/index.js @@ -195,7 +195,11 @@ function prepareDatabaseForSuite(suite, context) { .admin() .command({ killAllSessions: [] }) .catch(err => { - if (err.message.match(/no such (cmd|command)/) || err.code === 11601) { + if ( + err.message.match(/no such (cmd|command)/) || + err.message.match(/Failed to kill on some hosts/) || + err.code === 11601 + ) { return; } diff --git a/test/functional/transactions.test.js b/test/functional/transactions.test.js index 170047258f..36c9ed304e 100644 --- a/test/functional/transactions.test.js +++ b/test/functional/transactions.test.js @@ -70,14 +70,14 @@ class TransactionsRunnerContext extends TestRunnerContext { expect(options).to.have.property('session'); const session = options.session; - expect(session.transaction.isPinned).to.be.true; + expect(session.isPinned).to.be.true; } assertSessionUnpinned(options) { expect(options).to.have.property('session'); const session = options.session; - expect(session.transaction.isPinned).to.be.false; + expect(session.isPinned).to.be.false; } } diff --git a/test/functional/unified-spec-runner/entities.ts b/test/functional/unified-spec-runner/entities.ts index a1adff3e9f..76a6eaf1a7 100644 --- a/test/functional/unified-spec-runner/entities.ts +++ b/test/functional/unified-spec-runner/entities.ts @@ -32,7 +32,7 @@ import type { } from '../../../src/cmap/command_monitoring_events'; import { patchCollectionOptions, patchDbOptions } from './unified-utils'; import { expect } from 'chai'; -import { TestConfiguration } from './runner'; +import { TestConfiguration, trace } from './runner'; interface UnifiedChangeStream extends ChangeStream { eventCollector: InstanceType; @@ -166,7 +166,7 @@ export class UnifiedMongoClient extends MongoClient { export class FailPointMap extends Map { async enableFailPoint( - addressOrClient: HostAddress | UnifiedMongoClient, + addressOrClient: string | HostAddress | UnifiedMongoClient, failPoint: Document ): Promise { let client: MongoClient; @@ -294,18 +294,28 @@ export class EntitiesMap extends Map { async cleanup(): Promise { await this.failPoints.disableFailPoints(); + + trace('closeCursors'); for (const [, cursor] of this.mapOf('cursor')) { await cursor.close(); } + + trace('closeStreams'); for (const [, stream] of this.mapOf('stream')) { await stream.close(); } + + trace('endSessions'); for (const [, session] of this.mapOf('session')) { await session.endSession({ force: true }); } + + trace('closeClient'); for (const [, client] of this.mapOf('client')) { await client.close(); } + + trace('clear'); this.clear(); } diff --git a/test/functional/unified-spec-runner/operations.ts b/test/functional/unified-spec-runner/operations.ts index 42baf39e6b..cad0e0cd62 100644 --- a/test/functional/unified-spec-runner/operations.ts +++ b/test/functional/unified-spec-runner/operations.ts @@ -128,12 +128,12 @@ operations.set('assertSessionNotDirty', async ({ entities, operation }) => { operations.set('assertSessionPinned', async ({ entities, operation }) => { const session = operation.arguments.session; - expect(session.transaction.isPinned).to.be.true; + expect(session.isPinned, 'session should be pinned').to.be.true; }); operations.set('assertSessionUnpinned', async ({ entities, operation }) => { const session = operation.arguments.session; - expect(session.transaction.isPinned).to.be.false; + expect(session.isPinned, 'session should be unpinned').to.be.false; }); operations.set('assertSessionTransactionState', async ({ entities, operation }) => { @@ -158,8 +158,9 @@ operations.set('assertNumberConnectionsCheckedOut', async ({ entities, operation const pool = server.s.pool; return count + pool.currentCheckedOutCount; }, 0); - // TODO: Durran: Fix in NODE-3011 - expect(checkedOutConnections || 0).to.equal(operation.arguments.connections); + + await Promise.resolve(); // wait one tick + expect(checkedOutConnections).to.equal(operation.arguments.connections); }); operations.set('bulkWrite', async ({ entities, operation }) => { @@ -336,11 +337,12 @@ operations.set('startTransaction', async ({ entities, operation }) => { operations.set('targetedFailPoint', async ({ entities, operation }) => { const session = operation.arguments.session; - expect(session.transaction.isPinned, 'Session must be pinned for a targetedFailPoint').to.be.true; - await entities.failPoints.enableFailPoint( - session.transaction._pinnedServer.s.description.hostAddress, - operation.arguments.failPoint - ); + expect(session.isPinned, 'Session must be pinned for a targetedFailPoint').to.be.true; + const address = session.transaction.isPinned + ? session.transaction._pinnedServer.s.description.hostAddress + : session.pinnedConnection.address; + + await entities.failPoints.enableFailPoint(address, operation.arguments.failPoint); }); operations.set('delete', async ({ entities, operation }) => { diff --git a/test/functional/unified-spec-runner/runner.ts b/test/functional/unified-spec-runner/runner.ts index 6f55f49cd5..525e8209a1 100644 --- a/test/functional/unified-spec-runner/runner.ts +++ b/test/functional/unified-spec-runner/runner.ts @@ -7,6 +7,7 @@ import { ns } from '../../../src/utils'; import { executeOperationAndCheck } from './operations'; import { matchesEvents } from './match'; import { satisfies as semverSatisfies } from 'semver'; +import { MongoClient } from '../../../src/mongo_client'; export type TestConfiguration = InstanceType< typeof import('../../tools/runner/config')['TestConfiguration'] @@ -15,6 +16,25 @@ interface MongoDBMochaTestContext extends Mocha.Context { configuration: TestConfiguration; } +export function trace(message: string): void { + if (process.env.UTR_TRACE) { + console.error(` > ${message}`); + } +} + +async function terminateOpenTransactions(client: MongoClient) { + // TODO(NODE-3491): on sharded clusters this has to be run on each mongos + try { + await client.db().admin().command({ killAllSessions: [] }); + } catch (err) { + if (err.code === 11601 || err.code === 13 || err.code === 59) { + return; + } + + throw err; + } +} + export async function runUnifiedTest( ctx: MongoDBMochaTestContext, unifiedSuite: uni.UnifiedSuite, @@ -51,8 +71,12 @@ export async function runUnifiedTest( let entities; try { + trace('\n starting test:'); await utilClient.connect(); + // terminate all sessions before each test suite + await terminateOpenTransactions(utilClient); + // Must fetch parameters before checking runOnRequirements ctx.configuration.parameters = await utilClient.db().admin().command({ getParameter: '*' }); @@ -63,6 +87,7 @@ export async function runUnifiedTest( ...(test.runOnRequirements ?? []) ]; + trace('satisfiesRequirements'); for (const requirement of allRequirements) { const met = await topologySatisfies(ctx.configuration, requirement, utilClient); if (!met) { @@ -75,15 +100,19 @@ export async function runUnifiedTest( // documents are specified, the test runner MUST create the collection with a "majority" write concern. // The test runner MUST use the internal MongoClient for these operations. if (unifiedSuite.initialData) { + trace('initialData'); for (const collData of unifiedSuite.initialData) { const db = utilClient.db(collData.databaseName); const collection = db.collection(collData.collectionName, { writeConcern: { w: 'majority' } }); + + trace('listCollections'); const collectionList = await db .listCollections({ name: collData.collectionName }) .toArray(); if (collectionList.length !== 0) { + trace('drop'); expect(await collection.drop()).to.be.true; } } @@ -95,16 +124,19 @@ export async function runUnifiedTest( }); if (!collData.documents?.length) { + trace('createCollection'); await db.createCollection(collData.collectionName, { writeConcern: { w: 'majority' } }); continue; } + trace('insertMany'); await collection.insertMany(collData.documents); } } + trace('createEntities'); entities = await EntitiesMap.createEntities(ctx.configuration, unifiedSuite.createEntities); // Workaround for SERVER-39704: @@ -125,7 +157,14 @@ export async function runUnifiedTest( } for (const operation of test.operations) { - await executeOperationAndCheck(operation, entities, utilClient); + trace(operation.name); + try { + await executeOperationAndCheck(operation, entities, utilClient); + } catch (e) { + // clean up all sessions on failed test, and rethrow + await terminateOpenTransactions(utilClient); + throw e; + } } const clientCommandEvents = new Map(); diff --git a/test/functional/unified-spec-runner/schema.ts b/test/functional/unified-spec-runner/schema.ts index 659f0b6c32..e399c37764 100644 --- a/test/functional/unified-spec-runner/schema.ts +++ b/test/functional/unified-spec-runner/schema.ts @@ -13,6 +13,7 @@ export interface OperationDescription { expectError?: ExpectedError; expectResult?: unknown; saveResultAsEntity?: string; + ignoreResultAndError?: boolean; } export interface UnifiedSuite { description: string; diff --git a/test/functional/unified-spec-runner/unified-utils.ts b/test/functional/unified-spec-runner/unified-utils.ts index 9f9ad171db..7b6e1dbead 100644 --- a/test/functional/unified-spec-runner/unified-utils.ts +++ b/test/functional/unified-spec-runner/unified-utils.ts @@ -35,10 +35,7 @@ export async function topologySatisfies( LoadBalanced: 'load-balanced' }[config.topologyType]; - if ( - r.topologies.includes('sharded-replicaset') && - (topologyType === 'sharded' || topologyType === 'load-balanced') - ) { + if (r.topologies.includes('sharded-replicaset') && topologyType === 'sharded') { const shards = await utilClient.db('config').collection('shards').find({}).toArray(); ok &&= shards.length > 0 && shards.every(shard => shard.host.split(',').length > 1); } else { diff --git a/test/manual/load-balancer.test.js b/test/manual/load-balancer.test.js new file mode 100644 index 0000000000..96a638172e --- /dev/null +++ b/test/manual/load-balancer.test.js @@ -0,0 +1,32 @@ +'use strict'; +const path = require('path'); +const { loadSpecTests } = require('../spec/index'); +const { runUnifiedSuite } = require('../functional/unified-spec-runner/runner'); + +const SKIP = [ + // Verified they use the same connection but the Node implementation executes + // a getMore before the killCursors even though the stream is immediately + // closed. + 'change streams pin to a connection', + 'errors during the initial connection hello are ignore', + + // NOTE: The following three tests are skipped pending a decision made on DRIVERS-1847, since + // pinning the connection on any getMore error is very awkward in node and likely results + // in sub-optimal pinning. + 'pinned connections are not returned after an network error during getMore', + 'pinned connections are not returned to the pool after a non-network error on getMore', + 'stale errors are ignored' +]; + +require('../functional/retryable_reads.test'); +require('../functional/retryable_writes.test'); +require('../functional/uri_options_spec.test'); +require('../functional/change_stream_spec.test'); +require('../functional/versioned-api.test'); +require('../unit/core/mongodb_srv.test'); +require('../unit/sdam/server_selection/spec.test'); + +describe('Load Balancer Unified Tests', function () { + this.timeout(10000); + runUnifiedSuite(loadSpecTests(path.join('load-balancers')), SKIP); +}); diff --git a/test/tools/runner/config.js b/test/tools/runner/config.js index 1b02cfa929..91b3a5ee33 100644 --- a/test/tools/runner/config.js +++ b/test/tools/runner/config.js @@ -38,13 +38,13 @@ class TestConfiguration { const url = new ConnectionString(uri); const { hosts } = url; const hostAddresses = hosts.map(HostAddress.fromString); - this.topologyType = context.topologyType; this.version = context.version; this.clientSideEncryption = context.clientSideEncryption; this.serverApi = context.serverApi; this.parameters = undefined; this.singleMongosLoadBalancerUri = context.singleMongosLoadBalancerUri; this.multiMongosLoadBalancerUri = context.multiMongosLoadBalancerUri; + this.topologyType = this.isLoadBalanced ? TopologyType.LoadBalanced : context.topologyType; this.options = { hosts, hostAddresses, @@ -62,14 +62,14 @@ class TestConfiguration { } } - writeConcern() { - return { writeConcern: { w: 1 } }; - } - get isLoadBalanced() { return !!this.singleMongosLoadBalancerUri && !!this.multiMongosLoadBalancerUri; } + writeConcern() { + return { writeConcern: { w: 1 } }; + } + get host() { return this.options.host; } @@ -235,10 +235,20 @@ class TestConfiguration { let actualHostsString; if (options.useMultipleMongoses) { - expect(this.options.hostAddresses).to.have.length.greaterThan(1); - actualHostsString = this.options.hostAddresses.map(ha => ha.toString()).join(','); + if (this.isLoadBalanced) { + const multiUri = new ConnectionString(this.multiMongosLoadBalancerUri); + actualHostsString = multiUri.hosts[0].toString(); + } else { + expect(this.options.hostAddresses).to.have.length.greaterThan(1); + actualHostsString = this.options.hostAddresses.map(ha => ha.toString()).join(','); + } } else { - actualHostsString = this.options.hostAddresses[0].toString(); + if (this.isLoadBalanced) { + const singleUri = new ConnectionString(this.singleMongosLoadBalancerUri); + actualHostsString = singleUri.hosts[0].toString(); + } else { + actualHostsString = this.options.hostAddresses[0].toString(); + } } const connectionString = url.toString().replace(FILLER_HOST, actualHostsString); diff --git a/test/tools/runner/filters/mongodb_topology_filter.js b/test/tools/runner/filters/mongodb_topology_filter.js index cc72404d6e..dafbed52d8 100755 --- a/test/tools/runner/filters/mongodb_topology_filter.js +++ b/test/tools/runner/filters/mongodb_topology_filter.js @@ -49,6 +49,8 @@ function topologyTypeToString(topologyType) { return 'replicaset'; } else if (topologyType === TopologyType.Sharded) { return 'sharded'; + } else if (topologyType === TopologyType.LoadBalanced) { + return 'load-balanced'; } return 'single'; diff --git a/test/unit/cmap/connection.test.js b/test/unit/cmap/connection.test.js index ad88b347ea..414af516b5 100644 --- a/test/unit/cmap/connection.test.js +++ b/test/unit/cmap/connection.test.js @@ -2,8 +2,9 @@ const mock = require('../../tools/mock'); const { connect } = require('../../../src/cmap/connect'); -const { Connection } = require('../../../src/cmap/connection'); +const { Connection, hasSessionSupport } = require('../../../src/cmap/connection'); const { expect } = require('chai'); +const { Socket } = require('net'); const { ns } = require('../../../src/utils'); const { getSymbolFrom } = require('../../tools/utils'); @@ -106,4 +107,49 @@ describe('Connection - unit/cmap', function () { done(); }); }); + + describe('.hasSessionSupport', function () { + let connection; + const stream = new Socket(); + + context('when logicalSessionTimeoutMinutes is present', function () { + beforeEach(function () { + connection = new Connection(stream, { + hostAddress: server.hostAddress(), + logicalSessionTimeoutMinutes: 5 + }); + }); + + it('returns true', function () { + expect(hasSessionSupport(connection)).to.be.true; + }); + }); + + context('when logicalSessionTimeoutMinutes is not present', function () { + context('when in load balancing mode', function () { + beforeEach(function () { + connection = new Connection(stream, { + hostAddress: server.hostAddress(), + loadBalanced: true + }); + }); + + it('returns true', function () { + expect(hasSessionSupport(connection)).to.be.true; + }); + }); + + context('when not in load balancing mode', function () { + beforeEach(function () { + connection = new Connection(stream, { + hostAddress: server.hostAddress() + }); + }); + + it('returns false', function () { + expect(hasSessionSupport(connection)).to.be.false; + }); + }); + }); + }); }); diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index ebd11017f7..1910aba932 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -73,6 +73,7 @@ describe('Connection Pool', function () { }); pool.withConnection( + undefined, (err, conn, cb) => { expect(err).to.not.exist; cb(); @@ -193,11 +194,15 @@ describe('Connection Pool', function () { pool.close(done); }; - pool.withConnection((err, conn, cb) => { - expect(err).to.exist; - expect(err).to.match(/closed/); - cb(err); - }, callback); + pool.withConnection( + undefined, + (err, conn, cb) => { + expect(err).to.exist; + expect(err).to.match(/closed/); + cb(err); + }, + callback + ); }); it('should return an error to the original callback', function (done) { @@ -216,10 +221,14 @@ describe('Connection Pool', function () { pool.close(done); }; - pool.withConnection((err, conn, cb) => { - expect(err).to.not.exist; - cb(new Error('my great error')); - }, callback); + pool.withConnection( + undefined, + (err, conn, cb) => { + expect(err).to.not.exist; + cb(new Error('my great error')); + }, + callback + ); }); it('should still manage a connection if no callback is provided', function (done) { @@ -243,7 +252,7 @@ describe('Connection Pool', function () { pool.close(done); }); - pool.withConnection((err, conn, cb) => { + pool.withConnection(undefined, (err, conn, cb) => { expect(err).to.not.exist; cb(); }); diff --git a/test/unit/cmap/metrics.test.js b/test/unit/cmap/metrics.test.js new file mode 100644 index 0000000000..50e2fc3d4e --- /dev/null +++ b/test/unit/cmap/metrics.test.js @@ -0,0 +1,134 @@ +'use strict'; + +const { expect } = require('chai'); +const { ConnectionPoolMetrics } = require('../../../src/cmap/metrics'); + +describe('ConnectionPoolMetrics', function () { + describe('#constructor', function () { + const metrics = new ConnectionPoolMetrics(); + + it('defaults txnConnections to zero', function () { + expect(metrics).property('txnConnections').to.equal(0); + }); + + it('defaults cursorConnections to zero', function () { + expect(metrics).property('cursorConnections').to.equal(0); + }); + + it('defaults otherConnections to zero', function () { + expect(metrics).property('otherConnections').to.equal(0); + }); + }); + + describe('#info', function () { + const metrics = new ConnectionPoolMetrics(); + + it('returns the metrics information', function () { + expect(metrics.info(5)).to.equal( + 'Timed out while checking out a connection from connection pool: ' + + 'maxPoolSize: 5, ' + + 'connections in use by cursors: 0, ' + + 'connections in use by transactions: 0, ' + + 'connections in use by other operations: 0' + ); + }); + }); + + describe('#markPinned', function () { + const metrics = new ConnectionPoolMetrics(); + + context('when the type is TXN', function () { + before(function () { + metrics.reset(); + metrics.markPinned(ConnectionPoolMetrics.TXN); + }); + + it('increments the txnConnections count', function () { + expect(metrics).to.deep.equal({ + txnConnections: 1, + cursorConnections: 0, + otherConnections: 0 + }); + }); + }); + + context('when the type is CURSOR', function () { + before(function () { + metrics.reset(); + metrics.markPinned(ConnectionPoolMetrics.CURSOR); + }); + + it('increments the cursorConnections count', function () { + expect(metrics).to.deep.equal({ + txnConnections: 0, + cursorConnections: 1, + otherConnections: 0 + }); + }); + }); + + context('when the type is OTHER', function () { + before(function () { + metrics.reset(); + metrics.markPinned(ConnectionPoolMetrics.OTHER); + }); + + it('increments the otherConnections count', function () { + expect(metrics).to.deep.equal({ + txnConnections: 0, + cursorConnections: 0, + otherConnections: 1 + }); + }); + }); + }); + + describe('#markUnpinned', function () { + const metrics = new ConnectionPoolMetrics(); + + context('when the type is TXN', function () { + before(function () { + metrics.reset(); + metrics.markUnpinned(ConnectionPoolMetrics.TXN); + }); + + it('decrements the txnConnections count', function () { + expect(metrics).to.deep.equal({ + txnConnections: -1, + cursorConnections: 0, + otherConnections: 0 + }); + }); + }); + + context('when the type is CURSOR', function () { + before(function () { + metrics.reset(); + metrics.markUnpinned(ConnectionPoolMetrics.CURSOR); + }); + + it('decrements the cursorConnections count', function () { + expect(metrics).to.deep.equal({ + txnConnections: 0, + cursorConnections: -1, + otherConnections: 0 + }); + }); + }); + + context('when the type is OTHER', function () { + before(function () { + metrics.reset(); + metrics.markUnpinned(ConnectionPoolMetrics.OTHER); + }); + + it('decrements the otherConnections count', function () { + expect(metrics).to.deep.equal({ + txnConnections: 0, + cursorConnections: 0, + otherConnections: -1 + }); + }); + }); + }); +}); diff --git a/test/unit/cmap/stream_description.test.js b/test/unit/cmap/stream_description.test.js new file mode 100644 index 0000000000..e84d16d7ae --- /dev/null +++ b/test/unit/cmap/stream_description.test.js @@ -0,0 +1,67 @@ +'use strict'; + +const { StreamDescription } = require('../../../src/cmap/stream_description'); +const { expect } = require('chai'); + +describe('StreamDescription - unit/cmap', function () { + describe('.new', function () { + context('when options are provided', function () { + context('when logicalSessionTimeoutMinutes is in the options', function () { + const options = { logicalSessionTimeoutMinutes: 5 }; + const description = new StreamDescription('a:27017', options); + + it('sets the property', function () { + expect(description.logicalSessionTimeoutMinutes).to.eq(5); + }); + }); + + context('when logicalSessionTimeoutMinutes is not in the options', function () { + const description = new StreamDescription('a:27017', {}); + + it('sets logicalSessionTimeoutMinutes to undefined', function () { + expect(description).to.have.property('logicalSessionTimeoutMinutes', undefined); + }); + }); + + context('when loadBalanced is in the options', function () { + context('when the value is true', function () { + const options = { loadBalanced: true }; + const description = new StreamDescription('a:27017', options); + + it('sets the property to true', function () { + expect(description.loadBalanced).to.be.true; + }); + }); + + context('when the value is false', function () { + const options = { loadBalanced: false }; + const description = new StreamDescription('a:27017', options); + + it('sets the property to false', function () { + expect(description.loadBalanced).to.be.false; + }); + }); + }); + + context('when loadBalanced is not in the options', function () { + const description = new StreamDescription('a:27017', {}); + + it('sets loadBalanced to false', function () { + expect(description.loadBalanced).to.be.false; + }); + }); + }); + + context('when options are not provided', function () { + const description = new StreamDescription('a:27017'); + + it('defaults logicalSessionTimeoutMinutes to undefined', function () { + expect(description).to.have.property('logicalSessionTimeoutMinutes', undefined); + }); + + it('defaults loadBalanced to false', function () { + expect(description.loadBalanced).to.be.false; + }); + }); + }); +}); diff --git a/test/unit/core/mongodb_srv.test.js b/test/unit/core/mongodb_srv.test.js index dd747534e3..76a1be9836 100644 --- a/test/unit/core/mongodb_srv.test.js +++ b/test/unit/core/mongodb_srv.test.js @@ -24,9 +24,7 @@ describe('mongodb+srv', function () { test[1].comment = test[0]; } - // TODO: Remove with NODE-3011 - const maybeIt = test[1].comment.includes('loadBalanced') ? it.skip : it; - maybeIt(test[1].comment, { + it(test[1].comment, { metadata: { requires: { topology: ['single'] } }, test: function (done) { try { @@ -55,6 +53,9 @@ describe('mongodb+srv', function () { expect(options).to.have.property('credentials'); expect(options.credentials.source).to.equal(testOptions.authSource); } + if (testOptions && testOptions.loadBalanced) { + expect(options).to.have.property('loadBalanced', testOptions.loadBalanced); + } if ( test[1].parsed_options && test[1].parsed_options.user && diff --git a/test/unit/core/response_test.js.test.js b/test/unit/core/response_test.js.test.js index 36177099b9..b27fdf3ccb 100644 --- a/test/unit/core/response_test.js.test.js +++ b/test/unit/core/response_test.js.test.js @@ -45,6 +45,8 @@ describe('Response', function () { }); } else if (doc.getMore) { request.reply(errdoc); + } else if (doc.killCursors) { + request.reply({ ok: 1 }); } }); diff --git a/test/unit/mongo_client_options.test.js b/test/unit/mongo_client_options.test.js index a14f466b58..c1f07e4544 100644 --- a/test/unit/mongo_client_options.test.js +++ b/test/unit/mongo_client_options.test.js @@ -534,4 +534,48 @@ describe('MongoOptions', function () { } }); }); + + context('when loadBalanced=true is in the URI', function () { + it('sets the option', function () { + const options = parseOptions('mongodb://a/?loadBalanced=true'); + expect(options.loadBalanced).to.be.true; + }); + + it('errors with multiple hosts', function () { + const parse = () => { + parseOptions('mongodb://a,b/?loadBalanced=true'); + }; + expect(parse).to.throw(/single host/); + }); + + it('errors with a replicaSet option', function () { + const parse = () => { + parseOptions('mongodb://a/?loadBalanced=true&replicaSet=test'); + }; + expect(parse).to.throw(/replicaSet/); + }); + + it('errors with a directConnection option', function () { + const parse = () => { + parseOptions('mongodb://a/?loadBalanced=true&directConnection=true'); + }; + expect(parse).to.throw(/directConnection/); + }); + }); + + context('when loadBalanced is in the options object', function () { + it('errors when the option is true', function () { + const parse = () => { + parseOptions('mongodb://a/', { loadBalanced: true }); + }; + expect(parse).to.throw(/URI/); + }); + + it('errors when the option is false', function () { + const parse = () => { + parseOptions('mongodb://a/', { loadBalanced: false }); + }; + expect(parse).to.throw(/URI/); + }); + }); }); diff --git a/test/unit/sdam/server_selection/spec.test.js b/test/unit/sdam/server_selection/spec.test.js index 5fd692ddd8..617c631f60 100644 --- a/test/unit/sdam/server_selection/spec.test.js +++ b/test/unit/sdam/server_selection/spec.test.js @@ -3,7 +3,7 @@ const path = require('path'); const fs = require('fs'); const { Topology } = require('../../../../src/sdam/topology'); const { Server } = require('../../../../src/sdam/server'); -const { ServerType } = require('../../../../src/sdam/common'); +const { ServerType, TopologyType } = require('../../../../src/sdam/common'); const { ServerDescription } = require('../../../../src/sdam/server_description'); const { ReadPreference } = require('../../../../src/read_preference'); const { MongoServerSelectionError } = require('../../../../src/error'); @@ -68,9 +68,7 @@ describe('Server Selection (spec)', function () { describe(subType, function () { specTests[topologyType][subType].forEach(test => { // NOTE: node does not support PossiblePrimary - // TODO: Re-enable LoadBalanced in NODE-3011 - const maybeIt = - test.name.match(/Possible/) || topologyType === 'LoadBalanced' ? it.skip : it; + const maybeIt = test.name.match(/Possible/) ? it.skip : it; maybeIt(test.name, function (done) { executeServerSelectionTest(test, { checkLatencyWindow: false }, done); @@ -81,9 +79,7 @@ describe('Server Selection (spec)', function () { describe(subType + ' (within latency window)', function () { specTests[topologyType][subType].forEach(test => { // NOTE: node does not support PossiblePrimary - // TODO: Re-enable LoadBalanced in NODE-3011 - const maybeIt = - test.name.match(/Possible/) || topologyType === 'LoadBalanced' ? it.skip : it; + const maybeIt = test.name.match(/Possible/) ? it.skip : it; maybeIt(test.name, function (done) { executeServerSelectionTest(test, { checkLatencyWindow: true }, done); @@ -150,10 +146,21 @@ function serverDescriptionFromDefinition(definition, hosts) { hosts = hosts || []; const serverType = definition.type; + if (serverType === ServerType.Unknown) { return new ServerDescription(definition.address); } + // There's no monitor in load balanced mode so no fake hello + // is needed. + if (serverType === ServerType.LoadBalancer) { + const description = new ServerDescription(definition.address, undefined, { + loadBalanced: true + }); + delete description.lastUpdateTime; + return description; + } + const fakeIsMaster = { ok: 1, hosts }; if (serverType !== ServerType.Standalone && serverType !== ServerType.Mongos) { fakeIsMaster.setName = 'rs'; @@ -218,7 +225,8 @@ function executeServerSelectionTest(testDefinition, options, testDone) { const topologyOptions = { heartbeatFrequencyMS: testDefinition.heartbeatFrequencyMS, - monitorFunction: () => {} + monitorFunction: () => {}, + loadBalanced: topologyDescription.type === TopologyType.LoadBalanced }; const topology = new Topology(seedData.seedlist, topologyOptions); diff --git a/test/unit/sdam/spec.test.js b/test/unit/sdam/spec.test.js index 7bf2d1b462..d0096d719d 100644 --- a/test/unit/sdam/spec.test.js +++ b/test/unit/sdam/spec.test.js @@ -2,6 +2,7 @@ const fs = require('fs'); const path = require('path'); const { Topology } = require('../../../src/sdam/topology'); +const { TopologyType } = require('../../../src/sdam/common'); const { Server } = require('../../../src/sdam/server'); const { ServerDescription } = require('../../../src/sdam/server_description'); const sdamEvents = require('../../../src/sdam/events'); @@ -60,13 +61,10 @@ describe('Server Discovery and Monitoring (spec)', function () { }); // DRIVERS-1249 should add directConnection and then update spec, remove skip - // TODO: NODE-3011 remove LB test skips const shouldSkip = desc => { const descriptions = [ 'Monitoring a standalone connection', - 'Monitoring a standalone connection - suppress update events for equal server descriptions', - 'Load balancer can be discovered and only has the address property set', - 'Monitoring a load balancer' + 'Monitoring a standalone connection - suppress update events for equal server descriptions' ]; return descriptions.includes(desc); }; @@ -249,7 +247,6 @@ function executeSDAMTest(testData, testDone) { phase.responses.forEach(response => topology.serverUpdateHandler(new ServerDescription(response[0], response[1])) ); - phaseDone(); } else if (phase.applicationErrors) { eachAsyncSeries( @@ -272,6 +269,8 @@ function executeSDAMTest(testData, testDone) { phaseDone(); } ); + } else { + phaseDone(); } }, err => { @@ -283,7 +282,7 @@ function executeSDAMTest(testData, testDone) { } function withConnectionStubImpl(appError) { - return function (fn, callback) { + return function (conn, fn, callback) { const connectionPool = this; // we are stubbing `withConnection` on the `ConnectionPool` class const fakeConnection = { generation: @@ -353,6 +352,15 @@ function assertOutcomeExpectations(topology, events, outcome) { return; } + // Load balancer mode has no monitor ismaster response and + // only expects address and compatible to be set in the + // server description. + if (description.type === TopologyType.LoadBalanced) { + if (key !== 'address' || key !== 'compatible') { + return; + } + } + if (key === 'events') { const expectedEvents = convertOutcomeEvents(outcomeValue); expect(events).to.have.length(expectedEvents.length);