Skip to content

Commit

Permalink
refactor(NODE-5679): introduce timeout abstraction and use for server…
Browse files Browse the repository at this point in the history
… selection and connection check out (#4078)

Co-authored-by: Neal Beeken <neal.beeken@mongodb.com>
  • Loading branch information
W-A-James and nbbeeken committed Apr 18, 2024
1 parent af18c53 commit 9c3ade5
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 158 deletions.
60 changes: 31 additions & 29 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,8 @@ import {
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import {
type Callback,
List,
makeCounter,
promiseWithResolvers,
TimeoutController
} from '../utils';
import { Timeout, TimeoutError } from '../timeout';
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -107,7 +102,7 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
export interface WaitQueueMember {
resolve: (conn: Connection) => void;
reject: (err: AnyError) => void;
timeoutController: TimeoutController;
timeout: Timeout;
[kCancelled]?: boolean;
}

Expand Down Expand Up @@ -368,33 +363,40 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();

const timeout = Timeout.expires(waitQueueTimeoutMS);

const waitQueueMember: WaitQueueMember = {
resolve,
reject,
timeoutController: new TimeoutController(waitQueueTimeoutMS)
timeout
};
waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
waitQueueMember[kCancelled] = true;
waitQueueMember.timeoutController.clear();

this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
);
waitQueueMember.reject(
new WaitQueueTimeoutError(
this[kWaitQueue].push(waitQueueMember);
process.nextTick(() => this.processWaitQueue());

try {
return await Promise.race([promise, waitQueueMember.timeout]);
} catch (error) {
if (TimeoutError.is(error)) {
waitQueueMember[kCancelled] = true;

waitQueueMember.timeout.clear();

this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
);
const timeoutError = new WaitQueueTimeoutError(
this.loadBalanced
? this.waitQueueErrorMetrics()
: 'Timed out while checking out a connection from connection pool',
this.address
)
);
});

this[kWaitQueue].push(waitQueueMember);
process.nextTick(() => this.processWaitQueue());

return await promise;
);
throw timeoutError;
}
throw error;
}
}

/**
Expand Down Expand Up @@ -758,7 +760,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, reason, error)
);
waitQueueMember.timeoutController.clear();
waitQueueMember.timeout.clear();
this[kWaitQueue].shift();
waitQueueMember.reject(error);
continue;
Expand All @@ -779,7 +781,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection)
);
waitQueueMember.timeoutController.clear();
waitQueueMember.timeout.clear();

this[kWaitQueue].shift();
waitQueueMember.resolve(connection);
Expand Down Expand Up @@ -818,7 +820,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
waitQueueMember.resolve(connection);
}

waitQueueMember.timeoutController.clear();
waitQueueMember.timeout.clear();
}
process.nextTick(() => this.processWaitQueue());
});
Expand Down
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ export type {
WithTransactionCallback
} from './sessions';
export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort';
export type { Timeout } from './timeout';
export type { Transaction, TransactionOptions, TxnState } from './transactions';
export type {
BufferPool,
Expand All @@ -555,7 +556,6 @@ export type {
HostAddress,
List,
MongoDBCollectionNamespace,
MongoDBNamespace,
TimeoutController
MongoDBNamespace
} from './utils';
export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern';
77 changes: 43 additions & 34 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mong
import { TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import { Timeout, TimeoutError } from '../timeout';
import type { Transaction } from '../transactions';
import {
type Callback,
Expand All @@ -43,8 +44,7 @@ import {
now,
ns,
promiseWithResolvers,
shuffle,
TimeoutController
shuffle
} from '../utils';
import {
_advanceClusterTime,
Expand Down Expand Up @@ -107,7 +107,7 @@ export interface ServerSelectionRequest {
resolve: (server: Server) => void;
reject: (error: MongoError) => void;
[kCancelled]?: boolean;
timeoutController: TimeoutController;
timeout: Timeout;
operationName: string;
waitingLogged: boolean;
previousServer?: ServerDescription;
Expand Down Expand Up @@ -178,6 +178,8 @@ export interface SelectServerOptions {
session?: ClientSession;
operationName: string;
previousServer?: ServerDescription;
/** @internal*/
timeout?: Timeout;
}

/** @public */
Expand Down Expand Up @@ -580,50 +582,57 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

const { promise: serverPromise, resolve, reject } = promiseWithResolvers<Server>();
const timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0);
const waitQueueMember: ServerSelectionRequest = {
serverSelector,
topologyDescription: this.description,
mongoLogger: this.client.mongoLogger,
transaction,
resolve,
reject,
timeoutController: new TimeoutController(options.serverSelectionTimeoutMS),
timeout,
startTime: now(),
operationName: options.operationName,
waitingLogged: false,
previousServer: options.previousServer
};

waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
waitQueueMember[kCancelled] = true;
waitQueueMember.timeoutController.clear();
const timeoutError = new MongoServerSelectionError(
`Server selection timed out after ${options.serverSelectionTimeoutMS} ms`,
this.description
);
if (
this.client.mongoLogger?.willLog(
MongoLoggableComponent.SERVER_SELECTION,
SeverityLevel.DEBUG
)
) {
this.client.mongoLogger?.debug(
MongoLoggableComponent.SERVER_SELECTION,
new ServerSelectionFailedEvent(
selector,
this.description,
timeoutError,
options.operationName
)
);
}
waitQueueMember.reject(timeoutError);
});

this[kWaitQueue].push(waitQueueMember);
processWaitQueue(this);

return await serverPromise;
try {
return await Promise.race([serverPromise, waitQueueMember.timeout]);
} catch (error) {
if (TimeoutError.is(error)) {
// Timeout
waitQueueMember[kCancelled] = true;
timeout.clear();
const timeoutError = new MongoServerSelectionError(
`Server selection timed out after ${options.serverSelectionTimeoutMS} ms`,
this.description
);
if (
this.client.mongoLogger?.willLog(
MongoLoggableComponent.SERVER_SELECTION,
SeverityLevel.DEBUG
)
) {
this.client.mongoLogger?.debug(
MongoLoggableComponent.SERVER_SELECTION,
new ServerSelectionFailedEvent(
selector,
this.description,
timeoutError,
options.operationName
)
);
}

throw timeoutError;
}
// Other server selection error
throw error;
}
}
/**
* Update the internal TopologyDescription with a ServerDescription
Expand Down Expand Up @@ -880,7 +889,7 @@ function drainWaitQueue(queue: List<ServerSelectionRequest>, drainError: MongoDr
continue;
}

waitQueueMember.timeoutController.clear();
waitQueueMember.timeout.clear();

if (!waitQueueMember[kCancelled]) {
if (
Expand Down Expand Up @@ -935,7 +944,7 @@ function processWaitQueue(topology: Topology) {
)
: serverDescriptions;
} catch (selectorError) {
waitQueueMember.timeoutController.clear();
waitQueueMember.timeout.clear();
if (
topology.client.mongoLogger?.willLog(
MongoLoggableComponent.SERVER_SELECTION,
Expand Down Expand Up @@ -1023,7 +1032,7 @@ function processWaitQueue(topology: Topology) {
transaction.pinServer(selectedServer);
}

waitQueueMember.timeoutController.clear();
waitQueueMember.timeout.clear();

if (
topology.client.mongoLogger?.willLog(
Expand Down
100 changes: 100 additions & 0 deletions src/timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { clearTimeout, setTimeout } from 'timers';

import { MongoInvalidArgumentError } from './error';
import { noop } from './utils';

/** @internal */
export class TimeoutError extends Error {
override get name(): 'TimeoutError' {
return 'TimeoutError';
}

constructor(message: string, options?: { cause?: Error }) {
super(message, options);
}

static is(error: unknown): error is TimeoutError {
return (
error != null && typeof error === 'object' && 'name' in error && error.name === 'TimeoutError'
);
}
}

type Executor = ConstructorParameters<typeof Promise<never>>[0];
type Reject = Parameters<ConstructorParameters<typeof Promise<never>>[0]>[1];
/**
* @internal
* This class is an abstraction over timeouts
* The Timeout class can only be in the pending or rejected states. It is guaranteed not to resolve
* if interacted with exclusively through its public API
* */
export class Timeout extends Promise<never> {
get [Symbol.toStringTag](): 'MongoDBTimeout' {
return 'MongoDBTimeout';
}

private timeoutError: TimeoutError;
private id?: NodeJS.Timeout;

public readonly start: number;
public ended: number | null = null;
public duration: number;
public timedOut = false;

/** Create a new timeout that expires in `duration` ms */
private constructor(executor: Executor = () => null, duration: number) {
let reject!: Reject;

if (duration < 0) {
throw new MongoInvalidArgumentError('Cannot create a Timeout with a negative duration');
}

super((_, promiseReject) => {
reject = promiseReject;

executor(noop, promiseReject);
});

// NOTE: Construct timeout error at point of Timeout instantiation to preserve stack traces
this.timeoutError = new TimeoutError(`Expired after ${duration}ms`);

this.duration = duration;
this.start = Math.trunc(performance.now());

if (this.duration > 0) {
this.id = setTimeout(() => {
this.ended = Math.trunc(performance.now());
this.timedOut = true;
reject(this.timeoutError);
}, this.duration);
// Ensure we do not keep the Node.js event loop running
if (typeof this.id.unref === 'function') {
this.id.unref();
}
}
}

/**
* Clears the underlying timeout. This method is idempotent
*/
clear(): void {
clearTimeout(this.id);
this.id = undefined;
}

public static expires(durationMS: number): Timeout {
return new Timeout(undefined, durationMS);
}

static is(timeout: unknown): timeout is Timeout {
return (
typeof timeout === 'object' &&
timeout != null &&
Symbol.toStringTag in timeout &&
timeout[Symbol.toStringTag] === 'MongoDBTimeout' &&
'then' in timeout &&
// eslint-disable-next-line github/no-then
typeof timeout.then === 'function'
);
}
}

0 comments on commit 9c3ade5

Please sign in to comment.