-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(NODE-6090): Implement CSOT logic for server selection and connection checkout #4095
base: main
Are you sure you want to change the base?
Changes from 50 commits
ab21b5b
2338dff
9615138
0eb1986
69f1087
ff2ec44
a3e552a
a9c96ac
21b2fcd
892800d
6f62c9f
b878c6c
c5fae5c
124f2d7
cd6a27e
b63b63f
9f4e884
6fc1198
c18b7f0
9c40f94
a958ad8
a75d9df
875ea67
3839330
c738691
c4ae2fb
bf5a37a
e2f9125
472fa9e
43e69bd
fb96314
ad55e9c
6c7adf1
f586736
021a94d
a61a9d0
30e564c
02509a1
7c73c58
b92162b
4625af4
7d7f005
ecdd66d
9307a80
78ccbd8
c4d26c5
3590e5e
f7cc3a5
a1c7601
2ed34ac
7111909
86a7eeb
2a816e3
62bb3ca
cdce963
5c91311
72c22b9
68f1eec
e88191a
daf0d5a
1d9ac3e
e468b54
fd6c751
bab1667
4d126eb
92564d8
09a8b17
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,13 +21,14 @@ import { | |
MongoInvalidArgumentError, | ||
MongoMissingCredentialsError, | ||
MongoNetworkError, | ||
MongoOperationTimeoutError, | ||
MongoRuntimeError, | ||
MongoServerError | ||
} from '../error'; | ||
import { CancellationToken, TypedEventEmitter } from '../mongo_types'; | ||
import type { Server } from '../sdam/server'; | ||
import { Timeout, TimeoutError } from '../timeout'; | ||
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils'; | ||
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils'; | ||
import { connect } from './connect'; | ||
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection'; | ||
import { | ||
|
@@ -102,7 +103,8 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g | |
export interface WaitQueueMember { | ||
resolve: (conn: Connection) => void; | ||
reject: (err: AnyError) => void; | ||
timeout: Timeout; | ||
timeout: Timeout | null; | ||
clearTimeout: boolean; | ||
[kCancelled]?: boolean; | ||
} | ||
|
||
|
@@ -354,35 +356,54 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> { | |
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or | ||
* explicitly destroyed by the new owner. | ||
*/ | ||
async checkOut(): Promise<Connection> { | ||
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> { | ||
this.emitAndLog( | ||
ConnectionPool.CONNECTION_CHECK_OUT_STARTED, | ||
new ConnectionCheckOutStartedEvent(this) | ||
); | ||
|
||
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; | ||
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS; | ||
|
||
const { promise, resolve, reject } = promiseWithResolvers<Connection>(); | ||
|
||
const timeout = Timeout.expires(waitQueueTimeoutMS); | ||
let timeout: Timeout | null = null; | ||
let clearTimeout = false; | ||
nbbeeken marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (options?.timeout) { | ||
// CSOT enabled | ||
// Determine if we're using the timeout passed in or a new timeout | ||
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) { | ||
if ( | ||
csotMin(options.timeout.duration, serverSelectionTimeoutMS) === serverSelectionTimeoutMS | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still and equals check, sorry I think we discussed it but didn't leave a comment, if duration is the same then we'll create a new timeout when we can use the existing one. |
||
) { | ||
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed); | ||
clearTimeout = true; | ||
} else { | ||
timeout = options.timeout; | ||
} | ||
} | ||
} else { | ||
timeout = Timeout.expires(waitQueueTimeoutMS); | ||
clearTimeout = true; | ||
} | ||
|
||
const waitQueueMember: WaitQueueMember = { | ||
resolve, | ||
reject, | ||
timeout | ||
timeout, | ||
clearTimeout | ||
}; | ||
|
||
this[kWaitQueue].push(waitQueueMember); | ||
process.nextTick(() => this.processWaitQueue()); | ||
|
||
try { | ||
return await Promise.race([promise, waitQueueMember.timeout]); | ||
timeout?.throwIfExpired(); | ||
return await (timeout ? Promise.race([promise, timeout]) : promise); | ||
} catch (error) { | ||
if (TimeoutError.is(error)) { | ||
waitQueueMember[kCancelled] = true; | ||
|
||
waitQueueMember.timeout.clear(); | ||
|
||
this.emitAndLog( | ||
ConnectionPool.CONNECTION_CHECK_OUT_FAILED, | ||
new ConnectionCheckOutFailedEvent(this, 'timeout') | ||
|
@@ -393,9 +414,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> { | |
: 'Timed out while checking out a connection from connection pool', | ||
this.address | ||
); | ||
if (options?.timeout) { | ||
throw new MongoOperationTimeoutError('Timed out during connection checkout', { | ||
cause: timeoutError | ||
}); | ||
} | ||
throw timeoutError; | ||
} | ||
throw error; | ||
} finally { | ||
if (clearTimeout) timeout?.clear(); | ||
} | ||
} | ||
|
||
|
@@ -760,7 +788,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> { | |
ConnectionPool.CONNECTION_CHECK_OUT_FAILED, | ||
new ConnectionCheckOutFailedEvent(this, reason, error) | ||
); | ||
waitQueueMember.timeout.clear(); | ||
if (waitQueueMember.clearTimeout) waitQueueMember.timeout?.clear(); | ||
this[kWaitQueue].shift(); | ||
waitQueueMember.reject(error); | ||
continue; | ||
|
@@ -781,7 +809,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> { | |
ConnectionPool.CONNECTION_CHECKED_OUT, | ||
new ConnectionCheckedOutEvent(this, connection) | ||
); | ||
waitQueueMember.timeout.clear(); | ||
if (waitQueueMember.clearTimeout) waitQueueMember.timeout?.clear(); | ||
|
||
this[kWaitQueue].shift(); | ||
waitQueueMember.resolve(connection); | ||
|
@@ -820,7 +848,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> { | |
waitQueueMember.resolve(connection); | ||
} | ||
|
||
waitQueueMember.timeout.clear(); | ||
if (waitQueueMember.clearTimeout) waitQueueMember.timeout?.clear(); | ||
} | ||
process.nextTick(() => this.processWaitQueue()); | ||
}); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '.. | |
import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; | ||
import type { Server } from '../sdam/server'; | ||
import type { ClientSession } from '../sessions'; | ||
import { Timeout } from '../timeout'; | ||
import type { MongoDBNamespace } from '../utils'; | ||
|
||
export const Aspect = { | ||
|
@@ -61,6 +62,9 @@ export abstract class AbstractOperation<TResult = any> { | |
|
||
options: OperationOptions; | ||
|
||
/** @internal */ | ||
timeout?: Timeout; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm confused about the purpose of the Given that, it feels odd to create a promise that we won't use. And it seems odd to use our generic Timeout class to store CSOT-specific context ( Thoughts on a generic (this relates to my other comment about consolidating timeout logic for server selection - we'll need it eventually for change streams too so it makes sense not to implement it three times). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea behind adding the I'd agree that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to close the loop here: We do use the "csot" timeout when it is the lower bound of time remaining before the operation is meant to expire. Server selection and connection checkout need to never take more than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The context would be similar to the timeout factory, not the operation context. We decided not to implement an operation context because we can pass CSOT-related data on the options objects in the driver. I'm proposing a CSOT context to encapsulate CSOT logic, which we would then pass through the driver on the options objects. I do not consider this thread resolved. I think the current implementation is more complicated than necessary because we only sometimes re-use the timeout.
Instead, I propose we either:
Regardless of which is chosen, I think the resultant code is simpler because server selection and connection checkout 1) do not worry about whether or not they need to use a cached timeout or create a new one 2) they can always clear the timeout. This works especially nicely with the TimeoutFactory or a TimeoutContext, because we can encapsulate all timeout related logic into a single place that's easily unit testable. I'm partial to the factory approach: class TimeoutFactory {
private timeoutMS: number | null;
private started = now();
getTimeoutForServerSelection(): Timeout {
// returns a timeout, handling CSOT vs Legacy timeout logic
}
}
class Topology {
selectServer(options: { ..., timeoutFactory: TimeoutFactory }) {
...
const timeout = timeoutFactory.getTimeoutForServerSelection();
try {
....
} finally {
timeout.clear();
}
}
} Note that with an approach like this, whether or not we reuse a timeout can easily be encapsulated into the TimeoutFactory by instantiating a timeout when the factory is constructed and returning the cached timeout where needed. But a context class could suffice too: class TimeoutContext {
private timeoutMS: number | null;
private started = now();
getTimeoutForServerSelection(): number {}
}
class Topology {
selectServer(options: { ..., timeoutContext: TimeoutContext }) {
...
const timeout = Timeout.expires(timeoutFactory.getTimeoutForServerSelection());
try {
....
} finally {
timeout.clear();
}
}
} An approach like this consolidates CSOT logic and can be reused outside of the main code path (i.e., topology connect). I don't think this work needs to block this PR. But I do want to make sure we discuss this, and I'd like to consider one of these approaches in a future ticket. |
||
|
||
nbbeeken marked this conversation as resolved.
Show resolved
Hide resolved
|
||
[kSession]: ClientSession | undefined; | ||
|
||
constructor(options: OperationOptions = {}) { | ||
|
@@ -76,6 +80,10 @@ export abstract class AbstractOperation<TResult = any> { | |
this.options = options; | ||
this.bypassPinningCheck = !!options.bypassPinningCheck; | ||
this.trySecondaryWrite = false; | ||
|
||
if (options.timeoutMS != null) { | ||
this.timeout = Timeout.expires(options.timeoutMS); | ||
} | ||
} | ||
|
||
/** Must match the first key of the command object sent to the server. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE FOR REVIEWERS: Needed to do this to ensure that unit tests using ping work