Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4649): use SDAM handling for errors from min pool size population #3424

Merged
merged 10 commits into from Sep 28, 2022
19 changes: 18 additions & 1 deletion src/cmap/connection_pool.ts
Expand Up @@ -19,6 +19,7 @@ import {
import { MongoError, MongoInvalidArgumentError, MongoRuntimeError } from '../error';
import { Logger } from '../logger';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { Callback, eachAsync, makeCounter } from '../utils';
import { connect } from './connect';
import { Connection, ConnectionEvents, ConnectionOptions } from './connection';
Expand All @@ -38,6 +39,8 @@ import {
import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors';
import { ConnectionPoolMetrics } from './metrics';

/** @internal */
const kServer = Symbol('server');
/** @internal */
const kLogger = Symbol('logger');
/** @internal */
Expand Down Expand Up @@ -126,6 +129,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
/** @internal */
[kPoolState]: typeof PoolState[keyof typeof PoolState];
/** @internal */
[kServer]: Server;
/** @internal */
[kLogger]: Logger;
/** @internal */
[kConnections]: Denque<Connection>;
Expand Down Expand Up @@ -212,7 +217,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
static readonly CONNECTION_CHECKED_IN = CONNECTION_CHECKED_IN;

/** @internal */
constructor(options: ConnectionPoolOptions) {
constructor(server: Server, options: ConnectionPoolOptions) {
super();

this.options = Object.freeze({
Expand All @@ -234,6 +239,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

this[kPoolState] = PoolState.paused;
this[kServer] = server;
this[kLogger] = new Logger('ConnectionPool');
this[kConnections] = new Denque();
this[kPending] = 0;
Expand Down Expand Up @@ -304,6 +310,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return this[kServiceGenerations];
}

get serverError() {
return this[kServer].description.error;
}

/**
* Get the metrics information for the pool when a wait queue timeout occurs.
*/
Expand Down Expand Up @@ -587,6 +597,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
if (err || !connection) {
this[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
this[kPending]--;
this.emit(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, { id: connectOptions.id } as Connection, 'error')
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
);
callback(err ?? new MongoRuntimeError('Connection creation failed without error'));
return;
}
Expand Down Expand Up @@ -651,6 +665,9 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// connection permits because that potentially delays the availability of
// the connection to a checkout request
this.createConnection((err, connection) => {
if (err) {
this[kServer].handleError(err);
}
if (!err && connection) {
this[kConnections].push(connection);
process.nextTick(() => this.processWaitQueue());
Expand Down
6 changes: 3 additions & 3 deletions src/cmap/errors.ts
Expand Up @@ -28,9 +28,9 @@ export class PoolClearedError extends MongoNetworkError {
address: string;

constructor(pool: ConnectionPool) {
// TODO(NODE-3135): pass in original pool-clearing error and use in message
// "failed with: <original error which cleared the pool>"
super(`Connection pool for ${pool.address} was cleared because another operation failed`);
super(
`Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"`
);
this.address = pool.address;
}

Expand Down
74 changes: 44 additions & 30 deletions src/sdam/server.ts
Expand Up @@ -20,6 +20,7 @@ import {
} from '../constants';
import type { AutoEncrypter } from '../deps';
import {
AnyError,
isNetworkErrorBeforeHandshake,
isNodeShuttingDownError,
isSDAMUnrecoverableError,
Expand Down Expand Up @@ -151,7 +152,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
logger: new Logger('Server'),
state: STATE_CLOSED,
topology,
pool: new ConnectionPool(poolOptions),
pool: new ConnectionPool(this, poolOptions),
operationCount: 0
};

Expand Down Expand Up @@ -376,6 +377,46 @@ export class Server extends TypedEventEmitter<ServerEvents> {
callback
);
}

/**
* Handle SDAM error
* @internal
*/
handleError(error: AnyError, connection?: Connection) {
if (!(error instanceof MongoError)) {
return;
}
if (error instanceof MongoNetworkError) {
if (!(error instanceof MongoNetworkTimeoutError) || isNetworkErrorBeforeHandshake(error)) {
// In load balanced mode we never mark the server as unknown and always
// clear for the specific service id.

if (!this.loadBalanced) {
error.addErrorLabel(MongoErrorLabel.ResetPool);
markServerUnknown(this, error);
} else if (connection) {
this.s.pool.clear(connection.serviceId);
}
}
} else {
if (isSDAMUnrecoverableError(error)) {
if (shouldHandleStateChangeError(this, error)) {
const shouldClearPool = maxWireVersion(this) <= 7 || isNodeShuttingDownError(error);
if (this.loadBalanced && connection && shouldClearPool) {
this.s.pool.clear(connection.serviceId);
}

if (!this.loadBalanced) {
if (shouldClearPool) {
error.addErrorLabel(MongoErrorLabel.ResetPool);
}
markServerUnknown(this, error);
process.nextTick(() => this.requestCheck());
}
}
}
}
}
}

function calculateRoundTripTime(oldRtt: number, duration: number): number {
Expand Down Expand Up @@ -490,18 +531,6 @@ function makeOperationHandler(
) {
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}

if (!(error instanceof MongoNetworkTimeoutError) || isNetworkErrorBeforeHandshake(error)) {
// In load balanced mode we never mark the server as unknown and always
// clear for the specific service id.

if (!server.loadBalanced) {
error.addErrorLabel(MongoErrorLabel.ResetPool);
markServerUnknown(server, error);
} else {
server.s.pool.clear(connection.serviceId);
}
}
} else {
if (
(isRetryableWritesEnabled(server.s.topology) || isTransactionCommand(cmd)) &&
Expand All @@ -510,23 +539,6 @@ function makeOperationHandler(
) {
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}

if (isSDAMUnrecoverableError(error)) {
if (shouldHandleStateChangeError(server, error)) {
const shouldClearPool = maxWireVersion(server) <= 7 || isNodeShuttingDownError(error);
if (server.loadBalanced && shouldClearPool) {
server.s.pool.clear(connection.serviceId);
}

if (!server.loadBalanced) {
if (shouldClearPool) {
error.addErrorLabel(MongoErrorLabel.ResetPool);
}
markServerUnknown(server, error);
process.nextTick(() => server.requestCheck());
}
}
}
}

if (
Expand All @@ -537,6 +549,8 @@ function makeOperationHandler(
session.unpin({ force: true });
}

server.handleError(error, connection);

return callback(error);
};
}
Expand Up @@ -11,35 +11,25 @@ const LB_SKIP_TESTS: SkipDescription[] = [
'pool clear halts background minPoolSize establishments',
'clearing a paused pool emits no events',
'after clear, cannot check out connections until pool ready',
'readying a ready pool emits no events'
'readying a ready pool emits no events',
'error during minPoolSize population clears pool'
].map(description => ({
description,
skipIfCondition: 'loadBalanced',
skipReason: 'cannot run against a load balanced environment'
}));

const POOL_PAUSED_SKIP_TESTS: SkipDescription[] = [
'error during minPoolSize population clears pool'
].map(description => ({
description,
skipIfCondition: 'always',
skipReason: 'TODO(NODE-3135): make connection pool SDAM aware'
}));

describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () {
const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling');

runCmapTestSuite(tests, {
testsToSkip: LB_SKIP_TESTS.concat(
[
{
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
skipIfCondition: 'always',
skipReason:
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
}
],
POOL_PAUSED_SKIP_TESTS
)
testsToSkip: LB_SKIP_TESTS.concat([
{
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
skipIfCondition: 'always',
skipReason:
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
}
])
});
});
52 changes: 35 additions & 17 deletions test/tools/cmap_spec_runner.ts
Expand Up @@ -3,7 +3,7 @@ import { EventEmitter } from 'events';
import { clearTimeout, setTimeout } from 'timers';
import { promisify } from 'util';

import { Connection, HostAddress, MongoClient } from '../../src';
import { Connection, HostAddress, MongoClient, Server } from '../../src';
import { ConnectionPool, ConnectionPoolOptions } from '../../src/cmap/connection_pool';
import { CMAP_EVENTS } from '../../src/constants';
import { makeClientMetadata, shuffle } from '../../src/utils';
Expand Down Expand Up @@ -253,11 +253,13 @@ export class ThreadContext {
threads: Map<any, Thread> = new Map();
connections: Map<string, Connection> = new Map();
orphans: Set<Connection> = new Set();
poolEvents = [];
poolEvents: any[] = [];
poolEventsEventEmitter = new EventEmitter();

#poolOptions: Partial<ConnectionPoolOptions>;
#hostAddress: HostAddress;
#server: Server;
#originalServerPool: ConnectionPool;
#supportedOperations: ReturnType<typeof getTestOpDefinitions>;
#injectPoolStats = false;

Expand All @@ -267,12 +269,14 @@ export class ThreadContext {
* @param poolOptions - Allows the test to pass in extra options to the pool not specified by the spec test definition, such as the environment-dependent "loadBalanced"
*/
constructor(
server: Server,
hostAddress: HostAddress,
poolOptions: Partial<ConnectionPoolOptions> = {},
contextOptions: { injectPoolStats: boolean }
) {
this.#poolOptions = poolOptions;
this.#hostAddress = hostAddress;
this.#server = server;
this.#supportedOperations = getTestOpDefinitions(this);
this.#injectPoolStats = contextOptions.injectPoolStats;
}
Expand All @@ -292,11 +296,13 @@ export class ThreadContext {
}

createPool(options) {
this.pool = new ConnectionPool({
this.pool = new ConnectionPool(this.#server, {
...this.#poolOptions,
...options,
hostAddress: this.#hostAddress
});
this.#originalServerPool = this.#server.s.pool;
this.#server.s.pool = this.pool;
ALL_POOL_EVENTS.forEach(eventName => {
this.pool.on(eventName, event => {
if (this.#injectPoolStats) {
Expand All @@ -312,6 +318,7 @@ export class ThreadContext {
}

closePool() {
this.#server.s.pool = this.#originalServerPool;
return new Promise(resolve => {
ALL_POOL_EVENTS.forEach(ev => this.pool.removeAllListeners(ev));
this.pool.close(resolve);
Expand Down Expand Up @@ -438,7 +445,10 @@ export function runCmapTestSuite(
) {
for (const test of tests) {
describe(test.name, function () {
let hostAddress: HostAddress, threadContext: ThreadContext, client: MongoClient;
let hostAddress: HostAddress,
server: Server,
threadContext: ThreadContext,
client: MongoClient;

beforeEach(async function () {
let utilClient: MongoClient;
Expand Down Expand Up @@ -479,25 +489,33 @@ export function runCmapTestSuite(
}

try {
const serverMap = utilClient.topology.s.description.servers;
const hosts = shuffle(serverMap.keys());
const serverDescriptionMap = utilClient.topology?.s.description.servers;
const hosts = shuffle(serverDescriptionMap.keys());
const selectedHostUri = hosts[0];
hostAddress = serverMap.get(selectedHostUri).hostAddress;
hostAddress = serverDescriptionMap.get(selectedHostUri).hostAddress;

client = this.configuration.newClient(
`mongodb://${hostAddress}/${
this.configuration.isLoadBalanced ? '?loadBalanced=true' : '?directConnection=true'
}`
);
await client.connect();
if (test.failPoint) {
await client.db('admin').command(test.failPoint);
}

const serverMap = client.topology?.s.servers;
server = serverMap?.get(selectedHostUri);
if (!server) {
throw new Error('Failed to retrieve server for test');
}

threadContext = new ThreadContext(
server,
hostAddress,
this.configuration.isLoadBalanced ? { loadBalanced: true } : {},
{ injectPoolStats: !!options?.injectPoolStats }
);

if (test.failPoint) {
client = this.configuration.newClient(
`mongodb://${hostAddress}/${
this.configuration.isLoadBalanced ? '?loadBalanced=true' : '?directConnection=true'
}`
);
await client.connect();
await client.db('admin').command(test.failPoint);
}
} finally {
await utilClient.close();
}
Expand Down