Skip to content

Commit

Permalink
feat: kill in-flight operations when monitor fails
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Oct 28, 2022
1 parent 90181b1 commit e092c95
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 20 deletions.
81 changes: 73 additions & 8 deletions src/cmap/connection_pool.ts
Expand Up @@ -41,7 +41,12 @@ import {
ConnectionPoolReadyEvent,
ConnectionReadyEvent
} from './connection_pool_events';
import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors';
import {
PoolClearedError,
PoolClearedOnNetworkError,
PoolClosedError,
WaitQueueTimeoutError
} from './errors';
import { ConnectionPoolMetrics } from './metrics';

/** @internal */
Expand Down Expand Up @@ -391,10 +396,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this[kConnections].unshift(connection);
}

this[kCheckedOut].delete(connection);
const wasConnectionDeleted = this[kCheckedOut].delete(connection);
this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new ConnectionCheckedInEvent(this, connection));

if (willDestroy) {
if (wasConnectionDeleted && willDestroy) {
const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale';
this.destroyConnection(connection, reason);
}
Expand All @@ -408,8 +413,9 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* Pool reset is handled by incrementing the pool's generation count. Any existing connection of a
* previous generation will eventually be pruned during subsequent checkouts.
*/
clear(options: { serviceId?: ObjectId } = {}): void {
clear(options: { serviceId?: ObjectId; interruptInUseConnections?: boolean } = {}): void {
const { serviceId } = options;
const interruptInUseConnections = options.interruptInUseConnections ?? false;
if (this.closed) {
return;
}
Expand All @@ -433,18 +439,72 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return;
}

const oldGeneration = this[kGeneration];

// handle non load-balanced case
this[kGeneration] += 1;
const alreadyPaused = this[kPoolState] === PoolState.paused;
this[kPoolState] = PoolState.paused;

this.clearMinPoolSizeTimer();
if (!alreadyPaused) {
this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this));
this.emit(
ConnectionPool.CONNECTION_POOL_CLEARED,
new ConnectionPoolClearedEvent(this, { interruptInUseConnections })
);
}

process.nextTick(() =>
this.pruneConnections({ minGeneration: oldGeneration, interruptInUseConnections })
);

this.processWaitQueue();
}

/**
* Closes all checked in perished connections in the pool with a resumable PoolClearedOnNetworkError.
*
* If interruptInUseConnections is `true`, this method attempts to kill checked out connections as well.
* Only connections where `connection.generation <= minGeneration` are killed. Connections are closed with a
* resumable PoolClearedOnNetworkTimeoutError.
*/
private pruneConnections({
interruptInUseConnections,
minGeneration
}: {
interruptInUseConnections: boolean;
minGeneration: number;
}) {
this[kConnections].prune(connection => {
if (connection.generation <= minGeneration) {
connection.onError(new PoolClearedOnNetworkError(this));
this.emit(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, connection, 'stale')
);

return true;
}
return false;
});

if (interruptInUseConnections) {
for (const connection of this[kCheckedOut]) {
if (connection.generation <= minGeneration) {
this[kCheckedOut].delete(connection);
connection.onError(new PoolClearedOnNetworkError(this));
this.emit(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, connection, 'stale')
);
}
}

// TODO(NODE-xxxx): track pending connections and cancel
// this[kCancellationToken].emit('cancel');
}
}

/** Close the pool */
close(callback: Callback<void>): void;
close(options: CloseOptions, callback: Callback<void>): void;
Expand Down Expand Up @@ -573,7 +633,12 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return !!(this.options.maxIdleTimeMS && connection.idleTime > this.options.maxIdleTimeMS);
}

private connectionIsPerished(connection: Connection) {
/**
* Destroys a connection if the connection is perished.
*
* @returns `true` if the connection was destroyed, `false` otherwise.
*/
private destroyConnectionIfPerished(connection: Connection) {
const isStale = this.connectionIsStale(connection);
const isIdle = this.connectionIsIdle(connection);
if (!isStale && !isIdle && !connection.closed) {
Expand Down Expand Up @@ -659,7 +724,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return;
}

this[kConnections].prune(connection => this.connectionIsPerished(connection));
this[kConnections].prune(connection => this.destroyConnectionIfPerished(connection));

if (
this.totalConnectionCount < minPoolSize &&
Expand Down Expand Up @@ -735,7 +800,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
break;
}

if (!this.connectionIsPerished(connection)) {
if (!this.destroyConnectionIfPerished(connection)) {
this[kCheckedOut].add(connection);
this.emit(
ConnectionPool.CONNECTION_CHECKED_OUT,
Expand Down
4 changes: 3 additions & 1 deletion src/cmap/errors.ts
@@ -1,4 +1,4 @@
import { MongoDriverError, MongoNetworkError } from '../error';
import { MongoDriverError, MongoErrorLabel, MongoNetworkError } from '../error';
import type { ConnectionPool } from './connection_pool';

/**
Expand Down Expand Up @@ -49,6 +49,8 @@ export class PoolClearedOnNetworkError extends MongoNetworkError {
constructor(pool: ConnectionPool) {
super(`Connection to ${pool.address} interrupted due to server monitor timeout`);
this.address = pool.address;

this.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}

override get name(): string {
Expand Down
1 change: 1 addition & 0 deletions src/error.ts
Expand Up @@ -91,6 +91,7 @@ export const MongoErrorLabel = Object.freeze({
ResumableChangeStreamError: 'ResumableChangeStreamError',
HandshakeError: 'HandshakeError',
ResetPool: 'ResetPool',
InterruptInUseConnections: 'InterruptInUseConnections',
NoWritesPerformed: 'NoWritesPerformed'
} as const);

Expand Down
5 changes: 4 additions & 1 deletion src/sdam/monitor.ts
Expand Up @@ -4,7 +4,7 @@ import { Document, Long } from '../bson';
import { connect } from '../cmap/connect';
import { Connection, ConnectionOptions } from '../cmap/connection';
import { LEGACY_HELLO_COMMAND } from '../constants';
import { MongoError, MongoErrorLabel } from '../error';
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Callback } from '../utils';
import { calculateDurationInMs, EventEmitterWithState, makeStateMachine, now, ns } from '../utils';
Expand Down Expand Up @@ -221,6 +221,9 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {

const error = !(err instanceof MongoError) ? new MongoError(err) : err;
error.addErrorLabel(MongoErrorLabel.ResetPool);
if (error instanceof MongoNetworkTimeoutError) {
error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections);
}

monitor.emit('resetServer', error);
callback(err);
Expand Down
7 changes: 6 additions & 1 deletion src/sdam/topology.ts
Expand Up @@ -6,6 +6,7 @@ import { deserialize, serialize } from '../bson';
import type { MongoCredentials } from '../cmap/auth/mongo_credentials';
import type { ConnectionEvents, DestroyOptions } from '../cmap/connection';
import type { CloseOptions, ConnectionPoolEvents } from '../cmap/connection_pool';
import { PoolClearedOnNetworkError } from '../cmap/errors';
import { DEFAULT_OPTIONS, FEATURE_FLAGS } from '../connection_string';
import {
CLOSE,
Expand Down Expand Up @@ -839,7 +840,11 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes
incomingServerDescription.error instanceof MongoError &&
incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool)
) {
server.s.pool.clear();
const interruptInUseConnections = incomingServerDescription.error.hasErrorLabel(
MongoErrorLabel.InterruptInUseConnections
);

server.s.pool.clear({ interruptInUseConnections });
} else if (incomingServerDescription.error == null) {
const newTopologyType = topology.s.description.type;
const shouldMarkPoolReady =
Expand Down
Expand Up @@ -19,17 +19,28 @@ const LB_SKIP_TESTS: SkipDescription[] = [
skipReason: 'cannot run against a load balanced environment'
}));

const INTERRUPT_IN_USE_SKIPPED_TESTS: SkipDescription[] = [
{
description: 'clear with interruptInUseConnections = true closes pending connections',
skipIfCondition: 'always',
skipReason: 'TODO(NODE-xxxx): track and kill pending connections'
}
];

describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () {
const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling');

runCmapTestSuite(tests, {
testsToSkip: LB_SKIP_TESTS.concat([
{
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
skipIfCondition: 'always',
skipReason:
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
}
])
testsToSkip: LB_SKIP_TESTS.concat(
[
{
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
skipIfCondition: 'always',
skipReason:
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
}
],
INTERRUPT_IN_USE_SKIPPED_TESTS
)
});
});
2 changes: 1 addition & 1 deletion test/tools/cmap_spec_runner.ts
Expand Up @@ -197,7 +197,7 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({

return threadContext.pool.checkIn(connection);
},
clear: function (interruptInUseConnections: boolean) {
clear: function ({ interruptInUseConnections }: { interruptInUseConnections: boolean }) {
return threadContext.pool.clear({ interruptInUseConnections });
},
close: async function () {
Expand Down

0 comments on commit e092c95

Please sign in to comment.