From 212cc486a9b4426fba53e07eac59b96631a6523a Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Fri, 30 Jul 2021 23:55:13 +0200 Subject: [PATCH] fix: unpin on first execution of cursor creating command --- src/operations/aggregate.ts | 4 ++++ src/operations/execute_operation.ts | 22 +++++++++++++++++++--- src/operations/find.ts | 4 ++++ src/operations/indexes.ts | 4 ++++ src/operations/list_collections.ts | 4 ++++ src/operations/operation.ts | 4 ++++ src/sessions.ts | 26 +++++++++++++++++--------- 7 files changed, 56 insertions(+), 12 deletions(-) diff --git a/src/operations/aggregate.ts b/src/operations/aggregate.ts index 7d09495eebd..ceb7907b3ea 100644 --- a/src/operations/aggregate.ts +++ b/src/operations/aggregate.ts @@ -79,6 +79,10 @@ export class AggregateOperation extends CommandOperation { } } + get isCursorCreating(): boolean { + return true; + } + get canRetryRead(): boolean { return !this.hasWriteStage; } diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 45fb0fed192..c348023910e 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -4,6 +4,7 @@ import { isRetryableError, MONGODB_ERROR_CODES, MongoDriverError, + MongoNetworkError, MongoCompatibilityError, MongoServerError } from '../error'; @@ -181,16 +182,31 @@ function executeWithServerSelection( } // select a new server, and attempt to retry the operation - topology.selectServer(readPreference, serverSelectionOptions, (err?: any, server?: any) => { + topology.selectServer(readPreference, serverSelectionOptions, (e?: any, server?: any) => { if ( - err || + e || (operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) || (operation.hasAspect(Aspect.WRITE_OPERATION) && !supportsRetryableWrites(server)) ) { - callback(err); + callback(e); return; } + // If we have a cursor and the initial command fails with a network error, + // we can retry it on another connection. So we need to check it back in, clear the + // pool for the service id, and retry again. + if ( + err && + err instanceof MongoNetworkError && + server.loadBalanced && + session && + session.isPinned && + !session.inTransaction() && + operation.isCursorCreating + ) { + session.unpin({ force: true, forceClear: true }); + } + operation.execute(server, session, callback); }); } diff --git a/src/operations/find.ts b/src/operations/find.ts index aa7d8a1230f..198ac547c27 100644 --- a/src/operations/find.ts +++ b/src/operations/find.ts @@ -101,6 +101,10 @@ export class FindOperation extends CommandOperation { this.filter = filter != null && filter._bsontype === 'ObjectID' ? { _id: filter } : filter; } + get isCursorCreating(): boolean { + return true; + } + execute(server: Server, session: ClientSession, callback: Callback): void { this.server = server; diff --git a/src/operations/indexes.ts b/src/operations/indexes.ts index ed93aef564a..330f19be48d 100644 --- a/src/operations/indexes.ts +++ b/src/operations/indexes.ts @@ -169,6 +169,10 @@ export class IndexesOperation extends AbstractOperation { this.collection = collection; } + get isCursorCreating(): boolean { + return true; + } + execute(server: Server, session: ClientSession, callback: Callback): void { const coll = this.collection; const options = this.options; diff --git a/src/operations/list_collections.ts b/src/operations/list_collections.ts index 8512adf683a..573725d94ae 100644 --- a/src/operations/list_collections.ts +++ b/src/operations/list_collections.ts @@ -40,6 +40,10 @@ export class ListCollectionsOperation extends CommandOperation { } } + get isCursorCreating(): boolean { + return true; + } + execute(server: Server, session: ClientSession, callback: Callback): void { if (maxWireVersion(server) < LIST_COLLECTIONS_WIRE_VERSION) { let filter = this.filter; diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 0783ca87627..948a7ad0369 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -89,6 +89,10 @@ export abstract class AbstractOperation { return this[kSession]; } + get isCursorCreating(): boolean { + return false; + } + get canRetryRead(): boolean { return true; } diff --git a/src/sessions.ts b/src/sessions.ts index a6808511517..4d18b18dfbf 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -95,6 +95,7 @@ export interface EndSessionOptions { */ error?: AnyError; force?: boolean; + forceClear?: boolean; } /** @@ -225,7 +226,7 @@ export class ClientSession extends TypedEventEmitter { } /** @internal */ - unpin(options?: { force?: boolean; error?: AnyError }): void { + unpin(options?: { force?: boolean; forceClear?: boolean; error?: AnyError }): void { if (this.loadBalanced) { return maybeClearPinnedConnection(this, options); } @@ -479,16 +480,23 @@ export function maybeClearPinnedConnection( // NOTE: the spec talks about what to do on a network error only, but the tests seem to // to validate that we don't unpin on _all_ errors? - if (conn && (options?.error == null || options?.force)) { + if (conn) { const servers = Array.from(session.topology.s.servers.values()); const loadBalancer = servers[0]; - loadBalancer.s.pool.checkIn(conn); - conn.emit( - Connection.UNPINNED, - session.transaction.state !== TxnState.NO_TRANSACTION - ? ConnectionPoolMetrics.TXN - : ConnectionPoolMetrics.CURSOR - ); + + if (options?.error == null || options?.force) { + loadBalancer.s.pool.checkIn(conn); + conn.emit( + Connection.UNPINNED, + session.transaction.state !== TxnState.NO_TRANSACTION + ? ConnectionPoolMetrics.TXN + : ConnectionPoolMetrics.CURSOR + ); + + if (options?.forceClear) { + loadBalancer.s.pool.clear(conn.serviceId); + } + } session[kPinnedConnection] = undefined; }