Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce timeout abstraction POC #4077

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../o
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { type Timeout } from '../timeout';
import { maybeAddIdToDocuments } from '../utils';
import {
applyRetryableWrites,
Expand Down Expand Up @@ -874,6 +875,8 @@ export interface BulkWriteOptions extends CommandOperationOptions {
forceServerObjectId?: boolean;
/** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
let?: Document;
/** @internal */
timeout?: Timeout | null;
}

const executeCommandsAsync = promisify(executeCommands);
Expand Down
41 changes: 34 additions & 7 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { Timeout } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -58,6 +59,7 @@ import {
type WriteProtocolMessageType
} from './commands';
import type { Stream } from './connect';
import { type ConnectionPool } from './connection_pool';
import type { ClientMetadata } from './handshake/client_metadata';
import { StreamDescription, type StreamDescriptionOptions } from './stream_description';
import { type CompressorName, decompressResponse } from './wire_protocol/compression';
Expand Down Expand Up @@ -88,6 +90,8 @@ export interface CommandOptions extends BSONSerializeOptions {
writeConcern?: WriteConcern;

directConnection?: boolean;

timeout?: Timeout | null;
}

/** @public */
Expand Down Expand Up @@ -180,6 +184,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
* Once connection is established, command logging can log events (if enabled)
*/
public established: boolean;
public pool?: ConnectionPool;
/** Indicates that the connection (including underlying TCP socket) has been closed. */
public closed = false;

Expand Down Expand Up @@ -276,6 +281,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
);
}

private get minRoundTripTime(): number {
return this.pool?.server.description.minRoundTripTime ?? 0;
}

public markAvailable(): void {
this.lastUseTime = now();
}
Expand Down Expand Up @@ -340,6 +349,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

let clusterTime = this.clusterTime;

if (Timeout.is(options.timeout) && options.timeout.duration > 0) {
cmd.maxTimeMS = options.timeout.getMaxTimeMS(this.minRoundTripTime);
}

if (this.serverApi) {
const { version, strict, deprecationErrors } = this.serverApi;
cmd.apiVersion = version;
Expand Down Expand Up @@ -429,7 +442,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
try {
await this.writeCommand(message, {
agreedCompressor: this.description.compressor ?? 'none',
zlibCompressionLevel: this.description.zlibCompressionLevel
zlibCompressionLevel: this.description.zlibCompressionLevel,
timeout: options.timeout
});

if (options.noResponse) {
Expand All @@ -439,7 +453,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

this.throwIfAborted();

for await (const response of this.readMany()) {
for await (const response of this.readMany({ timeout: options.timeout })) {
this.socket.setTimeout(0);
const bson = response.parse();

Expand Down Expand Up @@ -632,7 +646,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
*/
private async writeCommand(
command: WriteProtocolMessageType,
options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number }
options: {
agreedCompressor?: CompressorName;
zlibCompressionLevel?: number;
timeout?: Timeout | null;
}
): Promise<void> {
const finalCommand =
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
Expand All @@ -644,8 +662,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

const buffer = Buffer.concat(await finalCommand.toBin());

if (this.socket.write(buffer)) return;
return await once(this.socket, 'drain');
if (this.socket.write(buffer)) {
return;
}
const drain = once(this.socket, 'drain');

if (options.timeout) {
await Promise.race([drain, options.timeout]);
}
await drain;
}

/**
Expand All @@ -657,9 +682,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
private async *readMany(options: {
timeout?: Timeout | null;
}): AsyncGenerator<OpMsgResponse | OpReply> {
try {
this.dataEvents = onData(this.messageStream);
this.dataEvents = onData(this.messageStream, options);
for await (const message of this.dataEvents) {
const response = await decompressResponse(message);
yield response;
Expand Down
3 changes: 3 additions & 0 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
[kMetrics]: ConnectionPoolMetrics;
[kProcessingWaitQueue]: boolean;

server: Server;

/**
* Emitted when the connection pool is created.
* @event
Expand Down Expand Up @@ -247,6 +249,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

this[kPoolState] = PoolState.paused;
this[kServer] = server;
this.server = server;
this[kConnections] = new List();
this[kPending] = 0;
this[kCheckedOut] = new Set();
Expand Down
5 changes: 4 additions & 1 deletion src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { type EventEmitter } from 'events';

import { type Timeout } from '../../timeout';
import { List, promiseWithResolvers } from '../../utils';

/**
Expand All @@ -18,7 +19,7 @@ type PendingPromises = Omit<
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
* It will reject upon an error event.
*/
export function onData(emitter: EventEmitter) {
export function onData(emitter: EventEmitter, options?: { timeout?: Timeout | null }) {
// Setup pending events and pending promise lists
/**
* When the caller has not yet called .next(), we store the
Expand Down Expand Up @@ -86,6 +87,8 @@ export function onData(emitter: EventEmitter) {
// Adding event handlers
emitter.on('data', eventHandler);
emitter.on('error', errorHandler);
// eslint-disable-next-line github/no-then
options?.timeout?.then(() => null, errorHandler);

return iterator;

Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ export type {
WithTransactionCallback
} from './sessions';
export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort';
export type { Timeout } from './timeout';
export type { Transaction, TransactionOptions, TxnState } from './transactions';
export type {
BufferPool,
Expand Down
7 changes: 6 additions & 1 deletion src/operations/bulk_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {
): Promise<BulkWriteResult> {
const coll = this.collection;
const operations = this.operations;
const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference };
const options = {
...this.options,
...this.bsonOptions,
readPreference: this.readPreference,
timeout: this.timeout
};

// Create the bulk operation
const bulk: BulkOperationBase =
Expand Down
3 changes: 2 additions & 1 deletion src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
...this.options,
...this.bsonOptions,
readPreference: this.readPreference,
session
session,
timeout: this.timeout
};

const serverWireVersion = maxWireVersion(server);
Expand Down
9 changes: 8 additions & 1 deletion src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ export async function executeOperation<
}

try {
if (operation.timeout) {
return await Promise.race([operation.execute(server, session), operation.timeout]);
}
return await operation.execute(server, session);
} catch (operationError) {
if (willRetry && operationError instanceof MongoError) {
Expand Down Expand Up @@ -260,7 +263,8 @@ async function retryOperation<
const server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
previousServer
previousServer,
timeout: operation.timeout
});

if (isWriteOperation && !supportsRetryableWrites(server)) {
Expand All @@ -270,6 +274,9 @@ async function retryOperation<
}

try {
if (operation.timeout) {
return await Promise.race([operation.execute(server, session), operation.timeout]);
}
return await operation.execute(server, session);
} catch (retryError) {
if (
Expand Down
3 changes: 2 additions & 1 deletion src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ export class FindOperation extends CommandOperation<Document> {
...this.options,
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
session
session,
timeout: this.timeout
});
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { Timeout } from '../timeout';
import type { MongoDBNamespace } from '../utils';

export const Aspect = {
Expand Down Expand Up @@ -63,7 +64,10 @@ export abstract class AbstractOperation<TResult = any> {

[kSession]: ClientSession | undefined;

timeout?: Timeout | null;

constructor(options: OperationOptions = {}) {
this.timeout = options.timeoutMS != null ? Timeout.expires(options.timeoutMS) : null;
this.readPreference = this.hasAspect(Aspect.WRITE_OPERATION)
? ReadPreference.primary
: ReadPreference.fromOptions(options) ?? ReadPreference.primary;
Expand Down
5 changes: 4 additions & 1 deletion src/operations/run_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ export type RunCommandOptions = {
session?: ClientSession;
/** The read preference */
readPreference?: ReadPreferenceLike;
/** @internal */
timeoutMS?: number;
} & BSONSerializeOptions;

/** @internal */
Expand All @@ -31,7 +33,8 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
const res: TODO_NODE_3286 = await server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
session
session,
timeout: this.timeout
});
return res;
}
Expand Down
5 changes: 4 additions & 1 deletion src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mong
import { TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import { type Timeout } from '../timeout';
import type { Transaction } from '../transactions';
import {
type Callback,
Expand Down Expand Up @@ -178,6 +179,8 @@ export interface SelectServerOptions {
session?: ClientSession;
operationName: string;
previousServer?: ServerDescription;
/** @internal*/
timeout?: Timeout | null;
}

/** @public */
Expand Down Expand Up @@ -623,7 +626,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
this[kWaitQueue].push(waitQueueMember);
processWaitQueue(this);

return await serverPromise;
return await (options.timeout ? Promise.race([options.timeout, serverPromise]) : serverPromise);
}
/**
* Update the internal TopologyDescription with a ServerDescription
Expand Down