-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(CSOT) - feature branch #4095
base: main
Are you sure you want to change the base?
Changes from 32 commits
ab21b5b
2338dff
9615138
0eb1986
69f1087
ff2ec44
a3e552a
a9c96ac
21b2fcd
892800d
6f62c9f
b878c6c
c5fae5c
124f2d7
cd6a27e
b63b63f
9f4e884
6fc1198
c18b7f0
9c40f94
a958ad8
a75d9df
875ea67
3839330
c738691
c4ae2fb
bf5a37a
e2f9125
472fa9e
43e69bd
fb96314
ad55e9c
6c7adf1
f586736
021a94d
a61a9d0
30e564c
02509a1
7c73c58
b92162b
4625af4
7d7f005
ecdd66d
9307a80
78ccbd8
c4d26c5
3590e5e
f7cc3a5
a1c7601
2ed34ac
7111909
86a7eeb
2a816e3
62bb3ca
cdce963
5c91311
72c22b9
68f1eec
e88191a
daf0d5a
1d9ac3e
e468b54
fd6c751
bab1667
4d126eb
92564d8
09a8b17
f2f4d63
c4e6ab0
cd6991f
d44b479
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ import { | |
} from '../sdam/server_selection'; | ||
import type { Topology } from '../sdam/topology'; | ||
import type { ClientSession } from '../sessions'; | ||
import { Timeout } from '../timeout'; | ||
import { squashError, supportsRetryableWrites } from '../utils'; | ||
import { AbstractOperation, Aspect } from './operation'; | ||
|
||
|
@@ -152,9 +153,21 @@ export async function executeOperation< | |
selector = readPreference; | ||
} | ||
|
||
const timeout = operation.timeout; | ||
timeout?.throwIfExpired(); | ||
// TODO: construct serverSelection timeout here, pass it into topology.selectServer and store on | ||
// operation for use in operation.execute when we have to perform connection checkout | ||
operation.serverSelectionTimeout = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thoughts on consolidating this logic into a function or something? We have similar logic in the topology |
||
operation.timeout != null | ||
? Timeout.expires( | ||
Timeout.min(operation.timeout.remainingTime, topology.s.serverSelectionTimeoutMS) | ||
) | ||
: undefined; | ||
|
||
const server = await topology.selectServer(selector, { | ||
session, | ||
operationName: operation.commandName | ||
operationName: operation.commandName, | ||
timeout: operation.serverSelectionTimeout | ||
}); | ||
|
||
if (session == null) { | ||
|
@@ -265,6 +278,7 @@ async function retryOperation< | |
// select a new server, and attempt to retry the operation | ||
const server = await topology.selectServer(selector, { | ||
session, | ||
timeout: operation.serverSelectionTimeout, | ||
operationName: operation.commandName, | ||
previousServer | ||
}); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '.. | |
import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; | ||
import type { Server } from '../sdam/server'; | ||
import type { ClientSession } from '../sessions'; | ||
import { Timeout } from '../timeout'; | ||
import type { MongoDBNamespace } from '../utils'; | ||
|
||
export const Aspect = { | ||
|
@@ -61,6 +62,12 @@ export abstract class AbstractOperation<TResult = any> { | |
|
||
options: OperationOptions; | ||
|
||
/** @internal */ | ||
timeout?: Timeout; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm confused about the purpose of the Given that, it feels odd to create a promise that we won't use. And it seems odd to use our generic Timeout class to store CSOT-specific context ( Thoughts on a generic (this relates to my other comment about consolidating timeout logic for server selection - we'll need it eventually for change streams too so it makes sense not to implement it three times). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea behind adding the I'd agree that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to close the loop here: We do use the "csot" timeout when it is the lower bound of time remaining before the operation is meant to expire. Server selection and connection checkout need to never take more than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The context would be similar to the timeout factory, not the operation context. We decided not to implement an operation context because we can pass CSOT-related data on the options objects in the driver. I'm proposing a CSOT context to encapsulate CSOT logic, which we would then pass through the driver on the options objects. I do not consider this thread resolved. I think the current implementation is more complicated than necessary because we only sometimes re-use the timeout.
Instead, I propose we either:
Regardless of which is chosen, I think the resultant code is simpler because server selection and connection checkout 1) do not worry about whether or not they need to use a cached timeout or create a new one 2) they can always clear the timeout. This works especially nicely with the TimeoutFactory or a TimeoutContext, because we can encapsulate all timeout related logic into a single place that's easily unit testable. I'm partial to the factory approach: class TimeoutFactory {
private timeoutMS: number | null;
private started = now();
getTimeoutForServerSelection(): Timeout {
// returns a timeout, handling CSOT vs Legacy timeout logic
}
}
class Topology {
selectServer(options: { ..., timeoutFactory: TimeoutFactory }) {
...
const timeout = timeoutFactory.getTimeoutForServerSelection();
try {
....
} finally {
timeout.clear();
}
}
} Note that with an approach like this, whether or not we reuse a timeout can easily be encapsulated into the TimeoutFactory by instantiating a timeout when the factory is constructed and returning the cached timeout where needed. But a context class could suffice too: class TimeoutContext {
private timeoutMS: number | null;
private started = now();
getTimeoutForServerSelection(): number {}
}
class Topology {
selectServer(options: { ..., timeoutContext: TimeoutContext }) {
...
const timeout = Timeout.expires(timeoutFactory.getTimeoutForServerSelection());
try {
....
} finally {
timeout.clear();
}
}
} An approach like this consolidates CSOT logic and can be reused outside of the main code path (i.e., topology connect). I don't think this work needs to block this PR. But I do want to make sure we discuss this, and I'd like to consider one of these approaches in a future ticket. |
||
|
||
/** @internal */ | ||
serverSelectionTimeout?: Timeout; | ||
W-A-James marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
nbbeeken marked this conversation as resolved.
Show resolved
Hide resolved
|
||
[kSession]: ClientSession | undefined; | ||
|
||
constructor(options: OperationOptions = {}) { | ||
|
@@ -76,6 +83,10 @@ export abstract class AbstractOperation<TResult = any> { | |
this.options = options; | ||
this.bypassPinningCheck = !!options.bypassPinningCheck; | ||
this.trySecondaryWrite = false; | ||
|
||
if (options.timeoutMS != null) { | ||
this.timeout = Timeout.expires(options.timeoutMS); | ||
} | ||
} | ||
|
||
/** Must match the first key of the command object sent to the server. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ import { | |
MongoInvalidArgumentError, | ||
MongoNetworkError, | ||
MongoNetworkTimeoutError, | ||
MongoOperationTimeoutError, | ||
MongoRuntimeError, | ||
MongoServerClosedError, | ||
type MongoServerError, | ||
|
@@ -40,6 +41,7 @@ import type { ServerApi } from '../mongo_client'; | |
import { TypedEventEmitter } from '../mongo_types'; | ||
import type { GetMoreOptions } from '../operations/get_more'; | ||
import type { ClientSession } from '../sessions'; | ||
import { TimeoutError } from '../timeout'; | ||
import { isTransactionCommand } from '../transactions'; | ||
import { | ||
type EventEmitterWithState, | ||
|
@@ -310,7 +312,7 @@ export class Server extends TypedEventEmitter<ServerEvents> { | |
this.incrementOperationCount(); | ||
if (conn == null) { | ||
try { | ||
conn = await this.pool.checkOut(); | ||
conn = await this.pool.checkOut({ timeout: options.serverSelectionTimeout }); | ||
aditi-khare-mongoDB marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (this.loadBalanced && isPinnableCommand(cmd, session)) { | ||
session?.pin(conn); | ||
} | ||
|
@@ -323,8 +325,23 @@ export class Server extends TypedEventEmitter<ServerEvents> { | |
|
||
try { | ||
try { | ||
return await conn.command(ns, cmd, finalOptions, responseType); | ||
if (finalOptions.operationTimeout) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer to keep this PR moving, we can always come back and clean it up. But I envisioned that when CSOT was disabled, we would still create a timeout, it would just be infinite. We'd always clean the timeout up afterwards, so we aren't leaking promises. Something like: const timeout = options.timeout ?? new Timeout(Infinity);
try {
Promise.race([_, timeout]);
} finally {
timeout.clear();
} (In a future where we can use resource management, this becomes nicer too because we won't need to manually clear the timeout: class Timeout {
[Symbol.dispose]() {
this.clear();
}
}
using timeout = options.timeout ?? new Timeout(Infinity);
await Promise.race([_, timeout]); ) |
||
finalOptions.operationTimeout.throwIfExpired(); | ||
return await Promise.race([ | ||
finalOptions.operationTimeout, | ||
conn.command(ns, cmd, finalOptions) | ||
]); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't recall any CSOT requirement that states we should time out the entirety of command execution at the connection layer. Instead, each step of connection-layer command execution has its own rules (your PR description mentions that this PR does not implement this logic. Is this only added to facilitate testing? Or is there a CSOT requirement I'm missing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch here. Meant to remove this change, but missed it. |
||
} else { | ||
return await conn.command(ns, cmd, finalOptions, responseType); | ||
} | ||
} catch (commandError) { | ||
if (TimeoutError.is(commandError)) | ||
throw this.decorateCommandError( | ||
conn, | ||
cmd, | ||
finalOptions, | ||
new MongoOperationTimeoutError('Timed out during command execution') | ||
); | ||
throw this.decorateCommandError(conn, cmd, finalOptions, commandError); | ||
} | ||
} catch (operationError) { | ||
|
@@ -333,6 +350,7 @@ export class Server extends TypedEventEmitter<ServerEvents> { | |
operationError.code === MONGODB_ERROR_CODES.Reauthenticate | ||
) { | ||
await this.pool.reauthenticate(conn); | ||
// TODO(NODE-5682): Implement CSOT support for socket read/write at the connection layer | ||
try { | ||
baileympearson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return await conn.command(ns, cmd, finalOptions, responseType); | ||
} catch (commandError) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ import { | |
type MongoDriverError, | ||
MongoError, | ||
MongoErrorLabel, | ||
MongoOperationTimeoutError, | ||
MongoRuntimeError, | ||
MongoServerSelectionError, | ||
MongoTopologyClosedError | ||
|
@@ -457,8 +458,19 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
} | ||
} | ||
|
||
const timeoutMS = this.client.options.timeoutMS; | ||
const timeout = timeoutMS != null ? Timeout.expires(timeoutMS) : undefined; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this correct if we're auto-connecting the client? |
||
const serverSelectionTimeoutMS = this.s.serverSelectionTimeoutMS; | ||
const serverSelectionTimeout = | ||
timeoutMS != null | ||
? Timeout.expires(Timeout.min(timeoutMS, serverSelectionTimeoutMS)) | ||
: undefined; | ||
const readPreference = options.readPreference ?? ReadPreference.primary; | ||
const selectServerOptions = { operationName: 'ping', ...options }; | ||
const selectServerOptions = { | ||
operationName: 'ping', | ||
timeout: serverSelectionTimeout, | ||
...options | ||
}; | ||
try { | ||
const server = await this.selectServer( | ||
readPreferenceServerSelector(readPreference), | ||
|
@@ -467,7 +479,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
|
||
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true; | ||
if (!skipPingOnConnect && server && this.s.credentials) { | ||
await server.command(ns('admin.$cmd'), { ping: 1 }, {}); | ||
await server.command( | ||
ns('admin.$cmd'), | ||
{ ping: 1 }, | ||
{ operationTimeout: timeout, serverSelectionTimeout } | ||
aditi-khare-mongoDB marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
stateTransition(this, STATE_CONNECTED); | ||
this.emit(Topology.OPEN, this); | ||
this.emit(Topology.CONNECT, this); | ||
|
@@ -582,7 +598,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
} | ||
|
||
const { promise: serverPromise, resolve, reject } = promiseWithResolvers<Server>(); | ||
const timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0); | ||
const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS ?? 0; | ||
const timeout = options.timeout ?? Timeout.expires(serverSelectionTimeoutMS); | ||
|
||
const waitQueueMember: ServerSelectionRequest = { | ||
serverSelector, | ||
topologyDescription: this.description, | ||
|
@@ -601,12 +619,13 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
processWaitQueue(this); | ||
|
||
try { | ||
waitQueueMember.timeout.throwIfExpired(); | ||
return await Promise.race([serverPromise, waitQueueMember.timeout]); | ||
} catch (error) { | ||
if (TimeoutError.is(error)) { | ||
// Timeout | ||
waitQueueMember[kCancelled] = true; | ||
timeout.clear(); | ||
waitQueueMember.timeout.clear(); | ||
const timeoutError = new MongoServerSelectionError( | ||
`Server selection timed out after ${options.serverSelectionTimeoutMS} ms`, | ||
this.description | ||
|
@@ -628,6 +647,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
); | ||
} | ||
|
||
if (options.timeout) { | ||
throw new MongoOperationTimeoutError('Timed out during server selection', { | ||
baileympearson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cause: timeoutError | ||
}); | ||
} | ||
throw timeoutError; | ||
} | ||
// Other server selection error | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE FOR REVIEWERS: Needed to do this to ensure that unit tests using ping work