Skip to content

Commit

Permalink
fix: unpin on first execution of cursor creating command
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Jul 30, 2021
1 parent 481daf4 commit 212cc48
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 12 deletions.
4 changes: 4 additions & 0 deletions src/operations/aggregate.ts
Expand Up @@ -79,6 +79,10 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
}
}

get isCursorCreating(): boolean {
return true;
}

get canRetryRead(): boolean {
return !this.hasWriteStage;
}
Expand Down
22 changes: 19 additions & 3 deletions src/operations/execute_operation.ts
Expand Up @@ -4,6 +4,7 @@ import {
isRetryableError,
MONGODB_ERROR_CODES,
MongoDriverError,
MongoNetworkError,
MongoCompatibilityError,
MongoServerError
} from '../error';
Expand Down Expand Up @@ -181,16 +182,31 @@ function executeWithServerSelection(
}

// select a new server, and attempt to retry the operation
topology.selectServer(readPreference, serverSelectionOptions, (err?: any, server?: any) => {
topology.selectServer(readPreference, serverSelectionOptions, (e?: any, server?: any) => {
if (
err ||
e ||
(operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) ||
(operation.hasAspect(Aspect.WRITE_OPERATION) && !supportsRetryableWrites(server))
) {
callback(err);
callback(e);
return;
}

// If we have a cursor and the initial command fails with a network error,
// we can retry it on another connection. So we need to check it back in, clear the
// pool for the service id, and retry again.
if (
err &&
err instanceof MongoNetworkError &&
server.loadBalanced &&
session &&
session.isPinned &&
!session.inTransaction() &&
operation.isCursorCreating
) {
session.unpin({ force: true, forceClear: true });
}

operation.execute(server, session, callback);
});
}
Expand Down
4 changes: 4 additions & 0 deletions src/operations/find.ts
Expand Up @@ -101,6 +101,10 @@ export class FindOperation extends CommandOperation<Document> {
this.filter = filter != null && filter._bsontype === 'ObjectID' ? { _id: filter } : filter;
}

get isCursorCreating(): boolean {
return true;
}

execute(server: Server, session: ClientSession, callback: Callback<Document>): void {
this.server = server;

Expand Down
4 changes: 4 additions & 0 deletions src/operations/indexes.ts
Expand Up @@ -169,6 +169,10 @@ export class IndexesOperation extends AbstractOperation<Document> {
this.collection = collection;
}

get isCursorCreating(): boolean {
return true;
}

execute(server: Server, session: ClientSession, callback: Callback<Document>): void {
const coll = this.collection;
const options = this.options;
Expand Down
4 changes: 4 additions & 0 deletions src/operations/list_collections.ts
Expand Up @@ -40,6 +40,10 @@ export class ListCollectionsOperation extends CommandOperation<string[]> {
}
}

get isCursorCreating(): boolean {
return true;
}

execute(server: Server, session: ClientSession, callback: Callback<string[]>): void {
if (maxWireVersion(server) < LIST_COLLECTIONS_WIRE_VERSION) {
let filter = this.filter;
Expand Down
4 changes: 4 additions & 0 deletions src/operations/operation.ts
Expand Up @@ -89,6 +89,10 @@ export abstract class AbstractOperation<TResult = any> {
return this[kSession];
}

get isCursorCreating(): boolean {
return false;
}

get canRetryRead(): boolean {
return true;
}
Expand Down
26 changes: 17 additions & 9 deletions src/sessions.ts
Expand Up @@ -95,6 +95,7 @@ export interface EndSessionOptions {
*/
error?: AnyError;
force?: boolean;
forceClear?: boolean;
}

/**
Expand Down Expand Up @@ -225,7 +226,7 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
}

/** @internal */
unpin(options?: { force?: boolean; error?: AnyError }): void {
unpin(options?: { force?: boolean; forceClear?: boolean; error?: AnyError }): void {
if (this.loadBalanced) {
return maybeClearPinnedConnection(this, options);
}
Expand Down Expand Up @@ -479,16 +480,23 @@ export function maybeClearPinnedConnection(

// NOTE: the spec talks about what to do on a network error only, but the tests seem to
// to validate that we don't unpin on _all_ errors?
if (conn && (options?.error == null || options?.force)) {
if (conn) {
const servers = Array.from(session.topology.s.servers.values());
const loadBalancer = servers[0];
loadBalancer.s.pool.checkIn(conn);
conn.emit(
Connection.UNPINNED,
session.transaction.state !== TxnState.NO_TRANSACTION
? ConnectionPoolMetrics.TXN
: ConnectionPoolMetrics.CURSOR
);

if (options?.error == null || options?.force) {
loadBalancer.s.pool.checkIn(conn);
conn.emit(
Connection.UNPINNED,
session.transaction.state !== TxnState.NO_TRANSACTION
? ConnectionPoolMetrics.TXN
: ConnectionPoolMetrics.CURSOR
);

if (options?.forceClear) {
loadBalancer.s.pool.clear(conn.serviceId);
}
}

session[kPinnedConnection] = undefined;
}
Expand Down

0 comments on commit 212cc48

Please sign in to comment.