Skip to content

Commit

Permalink
fix: unpin on first execution of cursor creating command
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Aug 2, 2021
1 parent 13211ee commit 302eef1
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/cursor/abstract_cursor.ts
Expand Up @@ -762,7 +762,7 @@ function cleanupCursor(
}

if (!session.inTransaction()) {
maybeClearPinnedConnection(session, { error });
maybeClearPinnedConnection(session, { error, force: true });
}
}

Expand All @@ -779,7 +779,7 @@ function cleanupCursor(
}

if (!session.inTransaction()) {
maybeClearPinnedConnection(session, { error });
maybeClearPinnedConnection(session, { error, force: true });
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/operations/aggregate.ts
Expand Up @@ -79,6 +79,10 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
}
}

get isCursorCreating(): boolean {
return true;
}

get canRetryRead(): boolean {
return !this.hasWriteStage;
}
Expand Down
4 changes: 4 additions & 0 deletions src/operations/estimated_document_count.ts
Expand Up @@ -28,6 +28,10 @@ export class EstimatedDocumentCountOperation extends CommandOperation<number> {
this.collectionName = collection.collectionName;
}

get isCursorCreating(): boolean {
return true;
}

execute(server: Server, session: ClientSession, callback: Callback<number>): void {
if (maxWireVersion(server) < 12) {
return this.executeLegacy(server, session, callback);
Expand Down
22 changes: 19 additions & 3 deletions src/operations/execute_operation.ts
Expand Up @@ -4,6 +4,7 @@ import {
isRetryableError,
MONGODB_ERROR_CODES,
MongoDriverError,
MongoNetworkError,
MongoCompatibilityError,
MongoServerError
} from '../error';
Expand Down Expand Up @@ -181,16 +182,31 @@ function executeWithServerSelection(
}

// select a new server, and attempt to retry the operation
topology.selectServer(readPreference, serverSelectionOptions, (err?: any, server?: any) => {
topology.selectServer(readPreference, serverSelectionOptions, (e?: any, server?: any) => {
if (
err ||
e ||
(operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) ||
(operation.hasAspect(Aspect.WRITE_OPERATION) && !supportsRetryableWrites(server))
) {
callback(err);
callback(e);
return;
}

// If we have a cursor and the initial command fails with a network error,
// we can retry it on another connection. So we need to check it back in, clear the
// pool for the service id, and retry again.
if (
err &&
err instanceof MongoNetworkError &&
server.loadBalanced &&
session &&
session.isPinned &&
!session.inTransaction() &&
operation.isCursorCreating
) {
session.unpin({ force: true, forceClear: true });
}

operation.execute(server, session, callback);
});
}
Expand Down
4 changes: 4 additions & 0 deletions src/operations/find.ts
Expand Up @@ -101,6 +101,10 @@ export class FindOperation extends CommandOperation<Document> {
this.filter = filter != null && filter._bsontype === 'ObjectID' ? { _id: filter } : filter;
}

get isCursorCreating(): boolean {
return true;
}

execute(server: Server, session: ClientSession, callback: Callback<Document>): void {
this.server = server;

Expand Down
4 changes: 4 additions & 0 deletions src/operations/find_one.ts
Expand Up @@ -21,6 +21,10 @@ export class FindOneOperation extends CommandOperation<Document> {
this.query = query;
}

get isCursorCreating(): boolean {
return true;
}

execute(server: Server, session: ClientSession, callback: Callback<Document>): void {
const coll = this.collection;
const query = this.query;
Expand Down
4 changes: 4 additions & 0 deletions src/operations/indexes.ts
Expand Up @@ -383,6 +383,10 @@ export class ListIndexesOperation extends CommandOperation<Document> {
this.collectionNamespace = collection.s.namespace;
}

get isCursorCreating(): boolean {
return true;
}

execute(server: Server, session: ClientSession, callback: Callback<Document>): void {
const serverWireVersion = maxWireVersion(server);
if (serverWireVersion < LIST_INDEXES_WIRE_VERSION) {
Expand Down
4 changes: 4 additions & 0 deletions src/operations/list_collections.ts
Expand Up @@ -40,6 +40,10 @@ export class ListCollectionsOperation extends CommandOperation<string[]> {
}
}

get isCursorCreating(): boolean {
return true;
}

execute(server: Server, session: ClientSession, callback: Callback<string[]>): void {
if (maxWireVersion(server) < LIST_COLLECTIONS_WIRE_VERSION) {
let filter = this.filter;
Expand Down
4 changes: 4 additions & 0 deletions src/operations/operation.ts
Expand Up @@ -89,6 +89,10 @@ export abstract class AbstractOperation<TResult = any> {
return this[kSession];
}

get isCursorCreating(): boolean {
return false;
}

get canRetryRead(): boolean {
return true;
}
Expand Down
31 changes: 22 additions & 9 deletions src/sessions.ts
Expand Up @@ -95,6 +95,7 @@ export interface EndSessionOptions {
*/
error?: AnyError;
force?: boolean;
forceClear?: boolean;
}

/**
Expand Down Expand Up @@ -225,7 +226,7 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
}

/** @internal */
unpin(options?: { force?: boolean; error?: AnyError }): void {
unpin(options?: { force?: boolean; forceClear?: boolean; error?: AnyError }): void {
if (this.loadBalanced) {
return maybeClearPinnedConnection(this, options);
}
Expand Down Expand Up @@ -479,16 +480,28 @@ export function maybeClearPinnedConnection(

// NOTE: the spec talks about what to do on a network error only, but the tests seem to
// to validate that we don't unpin on _all_ errors?
if (conn && (options?.error == null || options?.force)) {
if (conn) {
const servers = Array.from(session.topology.s.servers.values());
const loadBalancer = servers[0];
loadBalancer.s.pool.checkIn(conn);
conn.emit(
Connection.UNPINNED,
session.transaction.state !== TxnState.NO_TRANSACTION
? ConnectionPoolMetrics.TXN
: ConnectionPoolMetrics.CURSOR
);

if (options?.error == null || options?.force) {
loadBalancer.s.pool.checkIn(conn);
conn.emit(
Connection.UNPINNED,
session.transaction.state !== TxnState.NO_TRANSACTION
? ConnectionPoolMetrics.TXN
: ConnectionPoolMetrics.CURSOR
);

if (options?.forceClear) {
loadBalancer.s.pool.clear(conn.serviceId);
// NOTE: This is not truly necessary, but in the test suite it is. Clearing
// the connection pool will force the connection to get destroyed on the next check
// out, but in the test suite we may not check out a connection again after
// the assertions are finished.
conn.destroy();
}
}

session[kPinnedConnection] = undefined;
}
Expand Down

0 comments on commit 302eef1

Please sign in to comment.