Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(CSOT) - feature branch #4095

Open
wants to merge 68 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
ab21b5b
Install timeout throughout operation layer
W-A-James Apr 11, 2024
2338dff
update with timeout
W-A-James Apr 24, 2024
9615138
start prose test impl
W-A-James Apr 24, 2024
0eb1986
add timeout to find.execute
W-A-James Apr 26, 2024
69f1087
start implementing prose tests
W-A-James Apr 26, 2024
ff2ec44
don't construct Timeout when not needed
W-A-James Apr 26, 2024
a3e552a
ensure that timeoutMS is passed down correctly
W-A-James Apr 26, 2024
a9c96ac
start working on unit tests
W-A-James Apr 26, 2024
21b2fcd
continue prose test implementation
W-A-James Apr 26, 2024
892800d
revert spec test changes
W-A-James Apr 26, 2024
6f62c9f
revert spec test changes
W-A-James Apr 26, 2024
b878c6c
revert spec test changes
W-A-James Apr 26, 2024
c5fae5c
support timeout on run_command
W-A-James May 1, 2024
124f2d7
continue prose test implementation
W-A-James May 1, 2024
cd6a27e
prose test changes
W-A-James May 1, 2024
b63b63f
WIP - server selection changes
W-A-James May 1, 2024
9f4e884
revert unneeded connection changes
W-A-James May 3, 2024
6fc1198
add serverSelectionTimeout to run_command
W-A-James May 3, 2024
c18b7f0
Merge branch 'main' into NODE-6090
W-A-James May 3, 2024
9c40f94
use correct timeout
W-A-James May 3, 2024
a958ad8
Merge branch 'NODE-6090' of github.com:mongodb/node-mongodb-native in…
W-A-James May 3, 2024
a75d9df
reorder operations
W-A-James May 3, 2024
875ea67
formatting
W-A-James May 3, 2024
3839330
skip some CSOT tests that cannot be made to pass here
W-A-James May 3, 2024
c738691
Improve timeout messages
W-A-James May 3, 2024
c4ae2fb
silence eslint test issues
W-A-James May 3, 2024
bf5a37a
bump timeout values
W-A-James May 3, 2024
e2f9125
misc changes
W-A-James May 3, 2024
472fa9e
rename timeout
W-A-James May 3, 2024
43e69bd
make getter internal
W-A-James May 3, 2024
fb96314
rename timeout
W-A-James May 3, 2024
ad55e9c
remove unneeded change for this PR
W-A-James May 3, 2024
6c7adf1
clear server selection timeout after checkout and remove command exec…
W-A-James May 6, 2024
f586736
move Timeout.min to independent helper function
W-A-James May 6, 2024
021a94d
move Timeout.min to independent helper function
W-A-James May 6, 2024
a61a9d0
Merge branch 'main' into NODE-6090
W-A-James May 7, 2024
30e564c
update timeout propagation
W-A-James May 7, 2024
02509a1
clean up
W-A-James May 7, 2024
7c73c58
cleanup
W-A-James May 7, 2024
b92162b
test cleanup
W-A-James May 7, 2024
4625af4
clean up
W-A-James May 7, 2024
7d7f005
simplify calculation
W-A-James May 7, 2024
ecdd66d
cleanup
W-A-James May 7, 2024
9307a80
clarify branching timeout behaviour
W-A-James May 7, 2024
78ccbd8
Merge branch 'main' into NODE-6090
aditi-khare-mongoDB May 8, 2024
c4d26c5
operationTimeout -> timeout
W-A-James May 8, 2024
3590e5e
Merge branch 'NODE-6090' of github.com:mongodb/node-mongodb-native in…
W-A-James May 8, 2024
f7cc3a5
ensure timeouts are properly cleared
W-A-James May 8, 2024
a1c7601
don't race if given infinite timeout
W-A-James May 8, 2024
2ed34ac
default clearTimeout to false
W-A-James May 9, 2024
7111909
remove clearTimeout variable
W-A-James May 9, 2024
86a7eeb
conditionally clear timeout on early return
W-A-James May 9, 2024
2a816e3
fix unit tests
W-A-James May 9, 2024
62bb3ca
bump test timeout value
W-A-James May 9, 2024
cdce963
replace test with sinon fake timer test
W-A-James May 10, 2024
5c91311
Update src/sdam/topology.ts
W-A-James May 10, 2024
72c22b9
clean up logic
W-A-James May 10, 2024
68f1eec
Update test to assert on current behaviour
W-A-James May 10, 2024
e88191a
Merge branch 'main' into NODE-6090
W-A-James May 23, 2024
daf0d5a
fix autoconnect
W-A-James May 23, 2024
1d9ac3e
add test
W-A-James May 23, 2024
e468b54
remove .only
W-A-James May 23, 2024
fd6c751
fix test
W-A-James May 24, 2024
bab1667
Merge branch 'main' into NODE-6090
W-A-James May 24, 2024
4d126eb
ensure test only runs when failcommand is available
W-A-James May 24, 2024
92564d8
do not run on 4.2
W-A-James May 28, 2024
09a8b17
fix csot test?
W-A-James May 28, 2024
f2f4d63
Merge branch 'main' into NODE-6090
W-A-James May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ export class Admin {
new RunAdminCommandOperation(command, {
...resolveBSONOptions(options),
session: options?.session,
readPreference: options?.readPreference
readPreference: options?.readPreference,
timeoutMS: options?.timeoutMS ?? this.s.db.timeoutMS
Comment on lines +81 to +82
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE FOR REVIEWERS: Needed to do this to ensure that unit tests using ping work

})
);
}
Expand Down
4 changes: 4 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type Timeout } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -92,6 +93,9 @@ export interface CommandOptions extends BSONSerializeOptions {
writeConcern?: WriteConcern;

directConnection?: boolean;

/** @internal */
operationTimeout?: Timeout;
}
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

/** @public */
Expand Down
28 changes: 25 additions & 3 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import {
MongoInvalidArgumentError,
MongoMissingCredentialsError,
MongoNetworkError,
MongoOperationTimeoutError,
MongoRuntimeError,
MongoServerError
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { Timeout, TimeoutError } from '../timeout';
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -354,17 +355,33 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
* explicitly destroyed by the new owner.
*/
async checkOut(): Promise<Connection> {
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();

const timeout = Timeout.expires(waitQueueTimeoutMS);
let timeout: Timeout;
if (options?.timeout) {
// CSOT enabled
// Determine if we're using the timeout passed in or a new timeout
if (
csotMin(options.timeout.duration, serverSelectionTimeoutMS) === serverSelectionTimeoutMS
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
) {
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
} else {
timeout = options.timeout;
}
} else {
timeout = Timeout.expires(waitQueueTimeoutMS);
}

timeout.throwIfExpired();
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

const waitQueueMember: WaitQueueMember = {
resolve,
Expand Down Expand Up @@ -393,6 +410,11 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
: 'Timed out while checking out a connection from connection pool',
this.address
);
if (options?.timeout) {
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
cause: timeoutError
});
}
throw timeoutError;
}
throw error;
Expand Down
5 changes: 5 additions & 0 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ export class Collection<TSchema extends Document = Document> {
this.s.collectionHint = normalizeHintField(v);
}

/** @internal */
get timeoutMS(): number | undefined {
return this.s.options.timeoutMS;
}

/**
* Inserts a single document into MongoDB. If documents passed in do not contain the **_id** field,
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior
Expand Down
6 changes: 6 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ export class Db {
return this.s.namespace.toString();
}

/** @internal */
get timeoutMS(): number | undefined {
return this.s.options?.timeoutMS;
}

/**
* Create a new collection on a server with the specified options. Use this to create capped collections.
* More information about command options available at https://www.mongodb.com/docs/manual/reference/command/create/
Expand Down Expand Up @@ -272,6 +277,7 @@ export class Db {
this.client,
new RunCommandOperation(this, command, {
...resolveBSONOptions(options),
timeoutMS: options?.timeoutMS,
session: options?.session,
readPreference: options?.readPreference
})
Expand Down
9 changes: 9 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,15 @@ export class MongoUnexpectedServerResponseError extends MongoRuntimeError {
}
}

/**
* @internal
*/
export class MongoOperationTimeoutError extends MongoRuntimeError {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
override get name(): string {
return 'MongoOperationTimeoutError';
}
}

/**
* An error thrown when the user attempts to add options to a cursor that has already been
* initialized
Expand Down
2 changes: 2 additions & 0 deletions src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export interface OperationParent {
writeConcern?: WriteConcern;
readPreference?: ReadPreference;
bsonOptions?: BSONSerializeOptions;
timeoutMS?: number;
}

/** @internal */
Expand Down Expand Up @@ -117,6 +118,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
const options = {
...this.options,
...this.bsonOptions,
operationTimeout: this.operationTimeout,
readPreference: this.readPreference,
session
};
Expand Down
7 changes: 6 additions & 1 deletion src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,13 @@ export async function executeOperation<
selector = readPreference;
}

const timeout = operation.operationTimeout;
timeout?.throwIfExpired();

const server = await topology.selectServer(selector, {
session,
operationName: operation.commandName
operationName: operation.commandName,
timeout
});

if (session == null) {
Expand Down Expand Up @@ -265,6 +269,7 @@ async function retryOperation<
// select a new server, and attempt to retry the operation
const server = await topology.selectServer(selector, {
session,
timeout: operation.operationTimeout,
operationName: operation.commandName,
previousServer
});
Expand Down
8 changes: 8 additions & 0 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { Timeout } from '../timeout';
import type { MongoDBNamespace } from '../utils';

export const Aspect = {
Expand Down Expand Up @@ -61,6 +62,9 @@ export abstract class AbstractOperation<TResult = any> {

options: OperationOptions;

/** @internal */
operationTimeout?: Timeout;

nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
[kSession]: ClientSession | undefined;

constructor(options: OperationOptions = {}) {
Expand All @@ -76,6 +80,10 @@ export abstract class AbstractOperation<TResult = any> {
this.options = options;
this.bypassPinningCheck = !!options.bypassPinningCheck;
this.trySecondaryWrite = false;

if (options.timeoutMS != null) {
this.operationTimeout = Timeout.expires(options.timeoutMS);
}
}

/** Must match the first key of the command object sent to the server.
Expand Down
6 changes: 5 additions & 1 deletion src/operations/run_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ export type RunCommandOptions = {
session?: ClientSession;
/** The read preference */
readPreference?: ReadPreferenceLike;
/** @internal */
timeoutMS?: number;
} & BSONSerializeOptions;

/** @internal */
Expand All @@ -31,6 +33,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
const res: TODO_NODE_3286 = await server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
operationTimeout: this.operationTimeout,
session
});
return res;
Expand Down Expand Up @@ -58,7 +61,8 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
const res: TODO_NODE_3286 = await server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
session
session,
operationTimeout: this.operationTimeout
});
return res;
}
Expand Down
7 changes: 6 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,11 @@ export class Server extends TypedEventEmitter<ServerEvents> {
this.incrementOperationCount();
if (conn == null) {
try {
conn = await this.pool.checkOut();
if (options.operationTimeout) {
conn = await this.pool.checkOut({ timeout: options.operationTimeout });
} else {
conn = await this.pool.checkOut();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (options.operationTimeout) {
conn = await this.pool.checkOut({ timeout: options.operationTimeout });
} else {
conn = await this.pool.checkOut();
}
conn = await this.pool.checkOut({ timeout: options.operationTimeout });

TS supports just calling this because the timeout is optional

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel Warren's current code is easier to read (and easier for someone editing the code later to not accidentally make the code not CSOT spec-compliant) , but if we do end up going with this suggestion can we leave a clarifying comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised because breaking this up into two calls to checkOut based on a condition that does matter is more to read without more meaningful context given. Whether or not timeout exists, there is no change to how checkOut is, practically, invoked because the typescript reports that field as optional.

I would actually take this further:

conn = await this.pool.checkOut(options);

Why do we need to make a new object here? passing through options should be fine right? Less branching paths the less there is to debug

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on what may accidentally break the spec without a test warning us?

if (this.loadBalanced && isPinnableCommand(cmd, session)) {
session?.pin(conn);
}
Expand All @@ -333,6 +337,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
operationError.code === MONGODB_ERROR_CODES.Reauthenticate
) {
await this.pool.reauthenticate(conn);
// TODO(NODE-5682): Implement CSOT support for socket read/write at the connection layer
try {
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
return await conn.command(ns, cmd, finalOptions, responseType);
} catch (commandError) {
Expand Down
32 changes: 28 additions & 4 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
type MongoDriverError,
MongoError,
MongoErrorLabel,
MongoOperationTimeoutError,
MongoRuntimeError,
MongoServerSelectionError,
MongoTopologyClosedError
Expand Down Expand Up @@ -457,8 +458,14 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}
}

const timeoutMS = this.client.options.timeoutMS;
const operationTimeout = timeoutMS != null ? Timeout.expires(timeoutMS) : undefined;
const readPreference = options.readPreference ?? ReadPreference.primary;
const selectServerOptions = { operationName: 'ping', ...options };
const selectServerOptions = {
operationName: 'ping',
timeout: operationTimeout,
...options
};
try {
const server = await this.selectServer(
readPreferenceServerSelector(readPreference),
Expand All @@ -467,7 +474,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {

const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
if (!skipPingOnConnect && server && this.s.credentials) {
await server.command(ns('admin.$cmd'), { ping: 1 }, {});
await server.command(ns('admin.$cmd'), { ping: 1 }, { operationTimeout });
stateTransition(this, STATE_CONNECTED);
this.emit(Topology.OPEN, this);
this.emit(Topology.CONNECT, this);
Expand Down Expand Up @@ -556,6 +563,18 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
new ServerSelectionStartedEvent(selector, this.description, options.operationName)
);
}
const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS ?? 0;
let timeout: Timeout;
if (options.timeout) {
// CSOT Enabled
timeout =
Math.min(options.timeout.remainingTime, serverSelectionTimeoutMS) ===
serverSelectionTimeoutMS
? Timeout.expires(serverSelectionTimeoutMS)
: options.timeout;
} else {
timeout = Timeout.expires(serverSelectionTimeoutMS);
}

const isSharded = this.description.type === TopologyType.Sharded;
const session = options.session;
Expand All @@ -582,7 +601,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

const { promise: serverPromise, resolve, reject } = promiseWithResolvers<Server>();
const timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0);

const waitQueueMember: ServerSelectionRequest = {
serverSelector,
topologyDescription: this.description,
Expand All @@ -601,12 +620,12 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
processWaitQueue(this);

try {
waitQueueMember.timeout.throwIfExpired();
return await Promise.race([serverPromise, waitQueueMember.timeout]);
} catch (error) {
if (TimeoutError.is(error)) {
// Timeout
waitQueueMember[kCancelled] = true;
timeout.clear();
const timeoutError = new MongoServerSelectionError(
`Server selection timed out after ${options.serverSelectionTimeoutMS} ms`,
this.description
Expand All @@ -628,6 +647,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
);
}

if (options.timeout) {
throw new MongoOperationTimeoutError('Timed out during server selection', {
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
cause: timeoutError
});
}
throw timeoutError;
}
// Other server selection error
Expand Down
14 changes: 14 additions & 0 deletions src/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ export class Timeout extends Promise<never> {
public duration: number;
public timedOut = false;

get remainingTime(): number {
if (this.timedOut) return 0;
if (this.duration === 0) return Infinity;
return this.start + this.duration - Math.trunc(performance.now());
}

get timeElapsed(): number {
return Math.trunc(performance.now()) - this.start;
}

/** Create a new timeout that expires in `duration` ms */
private constructor(executor: Executor = () => null, duration: number, unref = false) {
let reject!: Reject;
Expand Down Expand Up @@ -78,6 +88,10 @@ export class Timeout extends Promise<never> {
this.id = undefined;
}

throwIfExpired(): void {
W-A-James marked this conversation as resolved.
Show resolved Hide resolved
if (this.timedOut) throw new TimeoutError('Timed out');
}

public static expires(durationMS: number, unref?: boolean): Timeout {
return new Timeout(undefined, durationMS, unref);
}
Expand Down
10 changes: 10 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,10 @@ export function resolveOptions<T extends CommandOperationOptions>(
result.readPreference = readPreference;
}

const timeoutMS = options?.timeoutMS;

result.timeoutMS = timeoutMS ?? parent?.timeoutMS;

return result;
}

Expand Down Expand Up @@ -1330,6 +1334,12 @@ export async function fileIsAccessible(fileName: string, mode?: number) {
}
}

export function csotMin(duration1: number, duration2: number): number {
if (duration1 === 0) return duration2;
if (duration2 === 0) return duration1;
return Math.min(duration1, duration2);
}

export function noop() {
return;
}