Skip to content

Commit

Permalink
fix: unpin on first execution of cursor creating command
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Jul 30, 2021
1 parent 481daf4 commit 78a8b9b
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 38 deletions.
4 changes: 4 additions & 0 deletions src/operations/aggregate.ts
Expand Up @@ -79,6 +79,10 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
}
}

get isCursorCreating(): boolean {
return true;
}

get canRetryRead(): boolean {
return !this.hasWriteStage;
}
Expand Down
22 changes: 19 additions & 3 deletions src/operations/execute_operation.ts
Expand Up @@ -4,6 +4,7 @@ import {
isRetryableError,
MONGODB_ERROR_CODES,
MongoDriverError,
MongoNetworkError,
MongoCompatibilityError,
MongoServerError
} from '../error';
Expand Down Expand Up @@ -181,16 +182,31 @@ function executeWithServerSelection(
}

// select a new server, and attempt to retry the operation
topology.selectServer(readPreference, serverSelectionOptions, (err?: any, server?: any) => {
topology.selectServer(readPreference, serverSelectionOptions, (e?: any, server?: any) => {
if (
err ||
e ||
(operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) ||
(operation.hasAspect(Aspect.WRITE_OPERATION) && !supportsRetryableWrites(server))
) {
callback(err);
callback(e);
return;
}

// If we have a cursor and the initial command fails with a network error,
// we can retry it on another connection. So we need to check it back in, clear the
// pool for the service id, and retry again.
if (
err &&
err instanceof MongoNetworkError &&
server.loadBalanced &&
session &&
session.isPinned &&
!session.inTransaction() &&
operation.isCursorCreating
) {
session.unpin({ force: true, forceClear: true });
}

operation.execute(server, session, callback);
});
}
Expand Down
4 changes: 4 additions & 0 deletions src/operations/find.ts
Expand Up @@ -101,6 +101,10 @@ export class FindOperation extends CommandOperation<Document> {
this.filter = filter != null && filter._bsontype === 'ObjectID' ? { _id: filter } : filter;
}

get isCursorCreating(): boolean {
return true;
}

execute(server: Server, session: ClientSession, callback: Callback<Document>): void {
this.server = server;

Expand Down
4 changes: 4 additions & 0 deletions src/operations/indexes.ts
Expand Up @@ -383,6 +383,10 @@ export class ListIndexesOperation extends CommandOperation<Document> {
this.collectionNamespace = collection.s.namespace;
}

get isCursorCreating(): boolean {
return true;
}

execute(server: Server, session: ClientSession, callback: Callback<Document>): void {
const serverWireVersion = maxWireVersion(server);
if (serverWireVersion < LIST_INDEXES_WIRE_VERSION) {
Expand Down
4 changes: 4 additions & 0 deletions src/operations/list_collections.ts
Expand Up @@ -40,6 +40,10 @@ export class ListCollectionsOperation extends CommandOperation<string[]> {
}
}

get isCursorCreating(): boolean {
return true;
}

execute(server: Server, session: ClientSession, callback: Callback<string[]>): void {
if (maxWireVersion(server) < LIST_COLLECTIONS_WIRE_VERSION) {
let filter = this.filter;
Expand Down
4 changes: 4 additions & 0 deletions src/operations/operation.ts
Expand Up @@ -89,6 +89,10 @@ export abstract class AbstractOperation<TResult = any> {
return this[kSession];
}

get isCursorCreating(): boolean {
return false;
}

get canRetryRead(): boolean {
return true;
}
Expand Down
26 changes: 17 additions & 9 deletions src/sessions.ts
Expand Up @@ -95,6 +95,7 @@ export interface EndSessionOptions {
*/
error?: AnyError;
force?: boolean;
forceClear?: boolean;
}

/**
Expand Down Expand Up @@ -225,7 +226,7 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
}

/** @internal */
unpin(options?: { force?: boolean; error?: AnyError }): void {
unpin(options?: { force?: boolean; forceClear?: boolean; error?: AnyError }): void {
if (this.loadBalanced) {
return maybeClearPinnedConnection(this, options);
}
Expand Down Expand Up @@ -479,16 +480,23 @@ export function maybeClearPinnedConnection(

// NOTE: the spec talks about what to do on a network error only, but the tests seem to
// to validate that we don't unpin on _all_ errors?
if (conn && (options?.error == null || options?.force)) {
if (conn) {
const servers = Array.from(session.topology.s.servers.values());
const loadBalancer = servers[0];
loadBalancer.s.pool.checkIn(conn);
conn.emit(
Connection.UNPINNED,
session.transaction.state !== TxnState.NO_TRANSACTION
? ConnectionPoolMetrics.TXN
: ConnectionPoolMetrics.CURSOR
);

if (options?.error == null || options?.force) {
loadBalancer.s.pool.checkIn(conn);
conn.emit(
Connection.UNPINNED,
session.transaction.state !== TxnState.NO_TRANSACTION
? ConnectionPoolMetrics.TXN
: ConnectionPoolMetrics.CURSOR
);

if (options?.forceClear) {
loadBalancer.s.pool.clear(conn.serviceId);
}
}

session[kPinnedConnection] = undefined;
}
Expand Down
33 changes: 7 additions & 26 deletions test/manual/load-balancer.test.js
@@ -1,32 +1,13 @@
'use strict';
const path = require('path');
const { loadSpecTests } = require('../spec/index');
const { runUnifiedSuite } = require('../functional/unified-spec-runner/runner');

const SKIP = [
// Verified they use the same connection but the Node implementation executes
// a getMore before the killCursors even though the stream is immediately
// closed.
'change streams pin to a connection',
'errors during the initial connection hello are ignore',

// NOTE: The following three tests are skipped pending a decision made on DRIVERS-1847, since
// pinning the connection on any getMore error is very awkward in node and likely results
// in sub-optimal pinning.
'pinned connections are not returned after an network error during getMore',
'pinned connections are not returned to the pool after a non-network error on getMore',
'stale errors are ignored'
];

require('../functional/retryable_reads.test');
require('../functional/retryable_writes.test');
require('../functional/uri_options_spec.test');
require('../functional/change_stream_spec.test');
require('../functional/versioned-api.test');
require('../unit/core/mongodb_srv.test');
require('../unit/sdam/server_selection/spec.test');
// require('../functional/retryable_writes.test');
// require('../functional/uri_options_spec.test');
// require('../functional/change_stream_spec.test');
// require('../functional/versioned-api.test');
// require('../unit/core/mongodb_srv.test');
// require('../unit/sdam/server_selection/spec.test');

describe('Load Balancer Unified Tests', function () {
this.timeout(10000);
runUnifiedSuite(loadSpecTests(path.join('load-balancers')), SKIP);
// runUnifiedSuite(loadSpecTests(path.join('load-balancers')), SKIP);
});

0 comments on commit 78a8b9b

Please sign in to comment.