Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-4385): add cmap pool pausing functionality #3321

Merged
merged 55 commits into from Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
49dfc40
feat: add connection pool ready event
dariakp Jul 6, 2022
46810f2
feat: add basic implementation of PoolClearedError
dariakp Jul 7, 2022
143d008
feat: clear pauses pool
dariakp Jul 8, 2022
b53e2c0
fix: calling ready on ready pool
dariakp Jul 8, 2022
e97757e
feat: sdam marks pool ready on successful check
dariakp Jul 8, 2022
7fd9537
fix: emit checkout failed event if pool paused
dariakp Jul 11, 2022
72977c1
feat: ensureMinPoolSize prunes perished connections
dariakp Jul 11, 2022
f0735a7
test: add cmap runner support for pool paused tests
dariakp Jul 11, 2022
fb210de
test: unskip the passing pool paused tests
dariakp Jul 11, 2022
7fe8ce3
fix: pool cleared error name
dariakp Jul 13, 2022
ae629f0
fix: update placeholder message for PoolClearedError
dariakp Jul 13, 2022
53ee54e
refactor: update constants
dariakp Jul 20, 2022
ccc6057
test: skip failing cmap test
dariakp Aug 24, 2022
e9405ce
refactor: misc cleanup
dariakp Jul 20, 2022
941e8f0
test: add sdam cmap prose test
dariakp Jul 20, 2022
6d1648b
test: update stat cmap tests
dariakp Jul 20, 2022
0570932
test: update cmap unit tests to call ready before running
dariakp Jul 21, 2022
809eee5
test: change order of operations to reduce flakiness
dariakp Jul 22, 2022
629afa0
test: properly skip lb incompatible tests
dariakp Jul 22, 2022
97d93ed
test: implement unified support for poolReadyEvent and unskip sdam test
dariakp Aug 24, 2022
a089bf6
test: update skips
dariakp Aug 24, 2022
6e08633
fix: handle closed state for clear and minpoolsize
dariakp Aug 25, 2022
154a60f
test: add back skips for failing auth tests
dariakp Aug 26, 2022
ad5437b
fix: tighten up conditions for ensureMinPoolSize
dariakp Aug 26, 2022
5aa4805
refactor: minor pool cleanup
dariakp Aug 26, 2022
d8466d0
fix: make sure pending queue members get the correct checkout error
dariakp Aug 26, 2022
763d7db
refactor: handle clearing waitqueue via processWaitQueue
dariakp Aug 26, 2022
6e30e6f
refactor: clean up close logic
dariakp Aug 26, 2022
ae006a7
lint: remove todo
dariakp Aug 26, 2022
aa98693
refactor: more pool cleanup
dariakp Aug 26, 2022
080a19a
fix: make retryability async and handle PoolClearedError
dariakp Aug 26, 2022
4ffa108
refactor: check pool state and modify pending count in createConnection
dariakp Aug 29, 2022
3765084
refactor: synchronize pool clearing and mark unknown
dariakp Aug 29, 2022
ded355f
fix: accidental early return
dariakp Aug 29, 2022
6f4c0e9
test: skip afterEach if test is skipped
dariakp Aug 29, 2022
146dafb
refactor: remove redundant pool clear calls
dariakp Aug 29, 2022
398136d
lint
dariakp Aug 29, 2022
525c29f
Revert "refactor: check pool state and modify pending count in create…
dariakp Aug 29, 2022
88ef8a5
refactor: decrement pending connections only in createConnection
dariakp Aug 29, 2022
0206e84
refactor: keep waitqueue members in queue until connection ready
dariakp Aug 31, 2022
4e0ada2
fix: maybe fix sdam to only clear pool when appropriate
dariakp Aug 31, 2022
d914234
fix: prevent parallel ensureminpoolsize timers
dariakp Aug 31, 2022
223cf11
lint: unremove eslint disable rule
dariakp Aug 31, 2022
ea9e742
fix: missed a spot
dariakp Aug 31, 2022
92a7c54
fix: add extra guard for error type
dariakp Aug 31, 2022
ffcdbb4
Revert "refactor: keep waitqueue members in queue until connection re…
dariakp Aug 31, 2022
a09ad23
fix: do not leave hanging callbacks in createConnection
dariakp Sep 1, 2022
6d5edd9
fix: remove duplicate resetServer
dariakp Sep 1, 2022
24a1860
test: skip flaky tests
dariakp Sep 1, 2022
e91e0bb
lint: remove stray comment
dariakp Sep 2, 2022
c33d731
fix: type cleanup
dariakp Sep 2, 2022
61aa0b3
refactor: extra guard for pool ready
dariakp Sep 2, 2022
a69c796
feat: preserve original error on cause property in MongoError
dariakp Sep 2, 2022
10c384b
lint: test type fix
dariakp Sep 2, 2022
fe381dc
ts: ignore node version dependent error
dariakp Sep 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
131 changes: 82 additions & 49 deletions src/cmap/connection_pool.ts
Expand Up @@ -13,6 +13,7 @@ import {
CONNECTION_POOL_CLEARED,
CONNECTION_POOL_CLOSED,
CONNECTION_POOL_CREATED,
CONNECTION_POOL_READY,
CONNECTION_READY
} from '../constants';
import { MongoError, MongoInvalidArgumentError, MongoRuntimeError } from '../error';
Expand All @@ -31,9 +32,10 @@ import {
ConnectionPoolClearedEvent,
ConnectionPoolClosedEvent,
ConnectionPoolCreatedEvent,
ConnectionPoolReadyEvent,
ConnectionReadyEvent
} from './connection_pool_events';
import { PoolClosedError, WaitQueueTimeoutError } from './errors';
import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors';
import { ConnectionPoolMetrics } from './metrics';

/** @internal */
Expand Down Expand Up @@ -103,6 +105,7 @@ export interface CloseOptions {
/** @public */
export type ConnectionPoolEvents = {
connectionPoolCreated(event: ConnectionPoolCreatedEvent): void;
connectionPoolReady(event: ConnectionPoolReadyEvent): void;
connectionPoolClosed(event: ConnectionPoolClosedEvent): void;
connectionPoolCleared(event: ConnectionPoolClearedEvent): void;
connectionCreated(event: ConnectionCreatedEvent): void;
Expand Down Expand Up @@ -167,6 +170,11 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* @event
*/
static readonly CONNECTION_POOL_CLEARED = CONNECTION_POOL_CLEARED;
/**
* Emitted each time the connection pool is marked ready
* @event
*/
static readonly CONNECTION_POOL_READY = CONNECTION_POOL_READY;
/**
* Emitted when a connection is created.
* @event
Expand Down Expand Up @@ -242,7 +250,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

process.nextTick(() => {
this.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this));
this.ensureMinPoolSize();
});
}

Expand Down Expand Up @@ -308,7 +315,13 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* Set the pool state to "ready"
*/
ready(): void {
if (this[kPoolState] !== PoolState.paused) {
return;
}
this[kPoolState] = PoolState.ready;
this.emit(ConnectionPool.CONNECTION_POOL_READY, new ConnectionPoolReadyEvent(this));
clearTimeout(this[kMinPoolSizeTimer]);
this.ensureMinPoolSize();
}

/**
Expand All @@ -322,15 +335,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
new ConnectionCheckOutStartedEvent(this)
);

if (this.closed) {
this.emit(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'poolClosed')
);
callback(new PoolClosedError(this));
return;
}

const waitQueueMember: WaitQueueMember = { callback };
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
if (waitQueueTimeoutMS) {
Expand Down Expand Up @@ -390,26 +394,40 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* previous generation will eventually be pruned during subsequent checkouts.
*/
clear(serviceId?: ObjectId): void {
if (this.closed) {
return;
}

// handle load balanced case
if (this.loadBalanced && serviceId) {
const sid = serviceId.toHexString();
const generation = this.serviceGenerations.get(sid);
// Only need to worry if the generation exists, since it should
// always be there but typescript needs the check.
if (generation == null) {
// TODO(NODE-3483)
throw new MongoRuntimeError('Service generations are required in load balancer mode.');
} else {
// Increment the generation for the service id.
this.serviceGenerations.set(sid, generation + 1);
}
} else {
this[kGeneration] += 1;
this.emit(
ConnectionPool.CONNECTION_POOL_CLEARED,
new ConnectionPoolClearedEvent(this, serviceId)
);
return;
}

this.emit(
ConnectionPool.CONNECTION_POOL_CLEARED,
new ConnectionPoolClearedEvent(this, serviceId)
);
// handle non load-balanced case
this[kGeneration] += 1;
const alreadyPaused = this[kPoolState] === PoolState.paused;
this[kPoolState] = PoolState.paused;

this.clearMinPoolSizeTimer();
this.processWaitQueue();

if (!alreadyPaused) {
this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this));
}
}

/** Close the pool */
Expand All @@ -430,33 +448,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// immediately cancel any in-flight connections
this[kCancellationToken].emit('cancel');

// drain the wait queue
while (this.waitQueueSize) {
const waitQueueMember = this[kWaitQueue].pop();
if (waitQueueMember) {
if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
if (!waitQueueMember[kCancelled]) {
// TODO(NODE-3483): Replace with MongoConnectionPoolClosedError
waitQueueMember.callback(new MongoRuntimeError('Connection pool closed'));
}
}
}

// clear the min pool size timer
const minPoolSizeTimer = this[kMinPoolSizeTimer];
if (minPoolSizeTimer) {
clearTimeout(minPoolSizeTimer);
}

// end the connection counter
if (typeof this[kConnectionCounter].return === 'function') {
this[kConnectionCounter].return(undefined);
}

// mark the pool as closed immediately
this[kPoolState] = PoolState.closed;
this.clearMinPoolSizeTimer();
this.processWaitQueue();

eachAsync<Connection>(
this[kConnections].toArray(),
(conn, cb) => {
Expand Down Expand Up @@ -526,12 +526,19 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
});
}

/** Clear the min pool size timer */
private clearMinPoolSizeTimer(): void {
const minPoolSizeTimer = this[kMinPoolSizeTimer];
if (minPoolSizeTimer) {
clearTimeout(minPoolSizeTimer);
}
}

private destroyConnection(connection: Connection, reason: string) {
this.emit(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, connection, reason)
);

// destroy the connection
process.nextTick(() => connection.destroy());
}
Expand Down Expand Up @@ -580,14 +587,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
connect(connectOptions, (err, connection) => {
if (err || !connection) {
this[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
callback(err);
this[kPending]--;
callback(err ?? new MongoRuntimeError('Connection creation failed without error'));
return;
}

// The pool might have closed since we started trying to create a connection
if (this.closed) {
if (this[kPoolState] !== PoolState.ready) {
this[kPending]--;
connection.destroy({ force: true });
callback(this.closed ? new PoolClosedError(this) : new PoolClearedError(this));
return;
}

Expand Down Expand Up @@ -616,17 +625,25 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
connection.markAvailable();
this.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection));

this[kPending]--;
callback(undefined, connection);
return;
});
}

private ensureMinPoolSize() {
const minPoolSize = this.options.minPoolSize;
if (this.closed || minPoolSize === 0) {
if (this[kPoolState] !== PoolState.ready || minPoolSize === 0) {
return;
}

for (let i = 0; i < this[kConnections].length; i++) {
const connection = this[kConnections].peekAt(i);
if (connection && this.connectionIsPerished(connection)) {
this[kConnections].removeOne(i);
}
}

if (
this.totalConnectionCount < minPoolSize &&
this.pendingConnectionCount < this.options.maxConnecting
Expand All @@ -635,23 +652,25 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// connection permits because that potentially delays the availability of
// the connection to a checkout request
this.createConnection((err, connection) => {
this[kPending]--;
if (!err && connection) {
this[kConnections].push(connection);
process.nextTick(() => this.processWaitQueue());
}
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10);
if (this[kPoolState] === PoolState.ready) {
clearTimeout(this[kMinPoolSizeTimer]);
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10);
}
});
} else {
clearTimeout(this[kMinPoolSizeTimer]);
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100);
}
}

private processWaitQueue() {
if (this.closed || this[kProcessingWaitQueue]) {
if (this[kProcessingWaitQueue]) {
return;
}

this[kProcessingWaitQueue] = true;

while (this.waitQueueSize) {
Expand All @@ -666,6 +685,21 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
continue;
}

if (this[kPoolState] !== PoolState.ready) {
const reason = this.closed ? 'poolClosed' : 'connectionError';
const error = this.closed ? new PoolClosedError(this) : new PoolClearedError(this);
this.emit(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, reason)
);
if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
this[kWaitQueue].shift();
waitQueueMember.callback(error);
continue;
}

if (!this.availableConnectionCount) {
break;
}
Expand Down Expand Up @@ -701,7 +735,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
continue;
}
this.createConnection((err, connection) => {
this[kPending]--;
if (waitQueueMember[kCancelled]) {
if (!err && connection) {
this[kConnections].push(connection);
Expand All @@ -710,7 +743,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
if (err) {
this.emit(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, err)
new ConnectionCheckOutFailedEvent(this, 'connectionError')
);
} else if (connection) {
this[kCheckedOut]++;
Expand Down
12 changes: 12 additions & 0 deletions src/cmap/connection_pool_events.ts
Expand Up @@ -37,6 +37,18 @@ export class ConnectionPoolCreatedEvent extends ConnectionPoolMonitoringEvent {
}
}

/**
* An event published when a connection pool is ready
* @public
* @category Event
*/
export class ConnectionPoolReadyEvent extends ConnectionPoolMonitoringEvent {
/** @internal */
constructor(pool: ConnectionPool) {
super(pool);
}
}

/**
* An event published when a connection pool is closed
* @public
Expand Down
23 changes: 22 additions & 1 deletion src/cmap/errors.ts
@@ -1,4 +1,4 @@
import { MongoDriverError } from '../error';
import { MongoDriverError, MongoNetworkError } from '../error';
import type { ConnectionPool } from './connection_pool';

/**
Expand All @@ -19,6 +19,27 @@ export class PoolClosedError extends MongoDriverError {
}
}

/**
* An error indicating a connection pool is currently paused
* @category Error
*/
export class PoolClearedError extends MongoNetworkError {
// TODO(NODE-3144): needs to extend RetryableError or be marked retryable in some other way per spec
/** The address of the connection pool */
address: string;

constructor(pool: ConnectionPool) {
// TODO(NODE-3135): pass in original pool-clearing error and use in message
// "failed with: <original error which cleared the pool>"
super(`Connection pool for ${pool.address} was cleared because another operation failed`);
this.address = pool.address;
}

override get name(): string {
return 'MongoPoolClearedError';
}
}

/**
* An error thrown when a request to check out a connection times out
* @category Error
Expand Down
6 changes: 4 additions & 2 deletions src/constants.ts
Expand Up @@ -26,6 +26,7 @@ export const TOPOLOGY_DESCRIPTION_CHANGED = 'topologyDescriptionChanged' as cons
export const CONNECTION_POOL_CREATED = 'connectionPoolCreated' as const;
export const CONNECTION_POOL_CLOSED = 'connectionPoolClosed' as const;
export const CONNECTION_POOL_CLEARED = 'connectionPoolCleared' as const;
export const CONNECTION_POOL_READY = 'connectionPoolReady' as const;
export const CONNECTION_CREATED = 'connectionCreated' as const;
export const CONNECTION_READY = 'connectionReady' as const;
export const CONNECTION_CLOSED = 'connectionClosed' as const;
Expand Down Expand Up @@ -57,15 +58,16 @@ export const HEARTBEAT_EVENTS = Object.freeze([
/** @public */
export const CMAP_EVENTS = Object.freeze([
CONNECTION_POOL_CREATED,
CONNECTION_POOL_READY,
CONNECTION_POOL_CLEARED,
CONNECTION_POOL_CLOSED,
CONNECTION_CREATED,
CONNECTION_READY,
CONNECTION_CLOSED,
CONNECTION_CHECK_OUT_STARTED,
CONNECTION_CHECK_OUT_FAILED,
CONNECTION_CHECKED_OUT,
CONNECTION_CHECKED_IN,
CONNECTION_POOL_CLEARED
CONNECTION_CHECKED_IN
] as const);

/** @public */
Expand Down