Skip to content

Commit

Permalink
refactor(NODE-4637): clean up async interval (#3411)
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Sep 16, 2022
1 parent 64b3ee9 commit 9883993
Show file tree
Hide file tree
Showing 7 changed files with 704 additions and 689 deletions.
3 changes: 2 additions & 1 deletion src/index.ts
Expand Up @@ -431,6 +431,8 @@ export type { ClusterTime, TimerQueue } from './sdam/common';
export type {
Monitor,
MonitorEvents,
MonitorInterval,
MonitorIntervalOptions,
MonitorOptions,
MonitorPrivate,
RTTPinger,
Expand Down Expand Up @@ -475,7 +477,6 @@ export type {
ClientMetadataOptions,
EventEmitterWithState,
HostAddress,
InterruptibleAsyncInterval,
MongoDBNamespace
} from './utils';
export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern';
152 changes: 136 additions & 16 deletions src/sdam/monitor.ts
Expand Up @@ -6,15 +6,8 @@ import { Connection, ConnectionOptions } from '../cmap/connection';
import { LEGACY_HELLO_COMMAND } from '../constants';
import { MongoError, MongoErrorLabel } from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Callback, InterruptibleAsyncInterval } from '../utils';
import {
calculateDurationInMs,
EventEmitterWithState,
makeInterruptibleAsyncInterval,
makeStateMachine,
now,
ns
} from '../utils';
import type { Callback } from '../utils';
import { calculateDurationInMs, EventEmitterWithState, makeStateMachine, now, ns } from '../utils';
import { ServerType, STATE_CLOSED, STATE_CLOSING } from './common';
import {
ServerHeartbeatFailedEvent,
Expand Down Expand Up @@ -87,7 +80,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
[kConnection]?: Connection;
[kCancellationToken]: CancellationToken;
/** @internal */
[kMonitorId]?: InterruptibleAsyncInterval;
[kMonitorId]?: MonitorInterval;
[kRTTPinger]?: RTTPinger;

get connection(): Connection | undefined {
Expand Down Expand Up @@ -150,9 +143,9 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
// start
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), {
interval: heartbeatFrequencyMS,
minInterval: minHeartbeatFrequencyMS,
this[kMonitorId] = new MonitorInterval(monitorServer(this), {
heartbeatFrequencyMS: heartbeatFrequencyMS,
minHeartbeatFrequencyMS: minHeartbeatFrequencyMS,
immediate: true
});
}
Expand Down Expand Up @@ -180,9 +173,9 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
// restart monitoring
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), {
interval: heartbeatFrequencyMS,
minInterval: minHeartbeatFrequencyMS
this[kMonitorId] = new MonitorInterval(monitorServer(this), {
heartbeatFrequencyMS: heartbeatFrequencyMS,
minHeartbeatFrequencyMS: minHeartbeatFrequencyMS
});
}

Expand Down Expand Up @@ -466,3 +459,130 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
measureAndReschedule();
});
}

/**
* @internal
*/
export interface MonitorIntervalOptions {
/** The interval to execute a method on */
heartbeatFrequencyMS: number;
/** A minimum interval that must elapse before the method is called */
minHeartbeatFrequencyMS: number;
/** Whether the method should be called immediately when the interval is started */
immediate: boolean;

/**
* Only used for testing unreliable timer environments
* @internal
*/
clock: () => number;
}

/**
* @internal
*/
export class MonitorInterval {
fn: (callback: Callback) => void;
timerId: NodeJS.Timeout | undefined;
lastCallTime: number;
isExpeditedCheckScheduled = false;
stopped = false;

heartbeatFrequencyMS: number;
minHeartbeatFrequencyMS: number;
clock: () => number;

constructor(fn: (callback: Callback) => void, options: Partial<MonitorIntervalOptions> = {}) {
this.fn = fn;
this.lastCallTime = 0;

this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 1000;
this.minHeartbeatFrequencyMS = options.minHeartbeatFrequencyMS ?? 500;
this.clock = typeof options.clock === 'function' ? options.clock : now;

if (options.immediate) {
this._executeAndReschedule();
} else {
this.lastCallTime = this.clock();
this._reschedule(undefined);
}
}

wake() {
const currentTime = this.clock();
const nextScheduledCallTime = this.lastCallTime + this.heartbeatFrequencyMS;
const timeUntilNextCall = nextScheduledCallTime - currentTime;

// For the streaming protocol: there is nothing obviously stopping this
// interval from being woken up again while we are waiting "infinitely"
// for `fn` to be called again`. Since the function effectively
// never completes, the `timeUntilNextCall` will continue to grow
// negatively unbounded, so it will never trigger a reschedule here.

// This is possible in virtualized environments like AWS Lambda where our
// clock is unreliable. In these cases the timer is "running" but never
// actually completes, so we want to execute immediately and then attempt
// to reschedule.
if (timeUntilNextCall < 0) {
this._executeAndReschedule();
return;
}

// debounce multiple calls to wake within the `minInterval`
if (this.isExpeditedCheckScheduled) {
return;
}

// reschedule a call as soon as possible, ensuring the call never happens
// faster than the `minInterval`
if (timeUntilNextCall > this.minHeartbeatFrequencyMS) {
this._reschedule(this.minHeartbeatFrequencyMS);
this.isExpeditedCheckScheduled = true;
}
}

stop() {
this.stopped = true;
if (this.timerId) {
clearTimeout(this.timerId);
this.timerId = undefined;
}

this.lastCallTime = 0;
this.isExpeditedCheckScheduled = false;
}

toString() {
return JSON.stringify(this);
}

toJSON() {
return {
timerId: this.timerId != null ? 'set' : 'cleared',
lastCallTime: this.lastCallTime,
isExpeditedCheckScheduled: this.isExpeditedCheckScheduled,
stopped: this.stopped,
heartbeatFrequencyMS: this.heartbeatFrequencyMS,
minHeartbeatFrequencyMS: this.minHeartbeatFrequencyMS
};
}

private _reschedule(ms?: number) {
if (this.stopped) return;
if (this.timerId) {
clearTimeout(this.timerId);
}

this.timerId = setTimeout(this._executeAndReschedule, ms || this.heartbeatFrequencyMS);
}

private _executeAndReschedule = () => {
this.isExpeditedCheckScheduled = false;
this.lastCallTime = this.clock();

this.fn(err => {
if (err) throw err;
this._reschedule(this.heartbeatFrequencyMS);
});
};
}
11 changes: 8 additions & 3 deletions src/sdam/topology.ts
Expand Up @@ -386,7 +386,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

/** Initiate server connect */
connect(options?: ConnectOptions, callback?: Callback): void {
connect(callback: Callback): void;
connect(options: ConnectOptions, callback: Callback): void;
connect(options?: ConnectOptions | Callback, callback?: Callback): void {
if (typeof options === 'function') (callback = options), (options = {});
options = options ?? {};
if (this.s.state === STATE_CONNECTED) {
Expand Down Expand Up @@ -468,7 +470,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

/** Close this topology */
close(options?: CloseOptions, callback?: Callback): void {
close(callback: Callback): void;
close(options: CloseOptions): void;
close(options: CloseOptions, callback: Callback): void;
close(options?: CloseOptions | Callback, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = {};
Expand All @@ -484,7 +489,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

const destroyedServers = Array.from(this.s.servers.values(), server => {
return promisify(destroyServer)(server, this, options);
return promisify(destroyServer)(server, this, options as CloseOptions);
});

Promise.all(destroyedServers)
Expand Down
119 changes: 0 additions & 119 deletions src/utils.ts
@@ -1,7 +1,6 @@
import * as crypto from 'crypto';
import type { SrvRecord } from 'dns';
import * as os from 'os';
import { clearTimeout, setTimeout } from 'timers';
import { URL } from 'url';

import { Document, ObjectId, resolveBSONOptions } from './bson';
Expand Down Expand Up @@ -774,124 +773,6 @@ export function calculateDurationInMs(started: number): number {
return elapsed < 0 ? 0 : elapsed;
}

export interface InterruptibleAsyncIntervalOptions {
/** The interval to execute a method on */
interval: number;
/** A minimum interval that must elapse before the method is called */
minInterval: number;
/** Whether the method should be called immediately when the interval is started */
immediate: boolean;

/**
* Only used for testing unreliable timer environments
* @internal
*/
clock: () => number;
}

/** @internal */
export interface InterruptibleAsyncInterval {
wake(): void;
stop(): void;
}

/**
* Creates an interval timer which is able to be woken up sooner than
* the interval. The timer will also debounce multiple calls to wake
* ensuring that the function is only ever called once within a minimum
* interval window.
* @internal
*
* @param fn - An async function to run on an interval, must accept a `callback` as its only parameter
*/
export function makeInterruptibleAsyncInterval(
fn: (callback: Callback) => void,
options?: Partial<InterruptibleAsyncIntervalOptions>
): InterruptibleAsyncInterval {
let timerId: NodeJS.Timeout | undefined;
let lastCallTime: number;
let cannotBeExpedited = false;
let stopped = false;

options = options ?? {};
const interval = options.interval || 1000;
const minInterval = options.minInterval || 500;
const immediate = typeof options.immediate === 'boolean' ? options.immediate : false;
const clock = typeof options.clock === 'function' ? options.clock : now;

function wake() {
const currentTime = clock();
const nextScheduledCallTime = lastCallTime + interval;
const timeUntilNextCall = nextScheduledCallTime - currentTime;

// For the streaming protocol: there is nothing obviously stopping this
// interval from being woken up again while we are waiting "infinitely"
// for `fn` to be called again`. Since the function effectively
// never completes, the `timeUntilNextCall` will continue to grow
// negatively unbounded, so it will never trigger a reschedule here.

// This is possible in virtualized environments like AWS Lambda where our
// clock is unreliable. In these cases the timer is "running" but never
// actually completes, so we want to execute immediately and then attempt
// to reschedule.
if (timeUntilNextCall < 0) {
executeAndReschedule();
return;
}

// debounce multiple calls to wake within the `minInterval`
if (cannotBeExpedited) {
return;
}

// reschedule a call as soon as possible, ensuring the call never happens
// faster than the `minInterval`
if (timeUntilNextCall > minInterval) {
reschedule(minInterval);
cannotBeExpedited = true;
}
}

function stop() {
stopped = true;
if (timerId) {
clearTimeout(timerId);
timerId = undefined;
}

lastCallTime = 0;
cannotBeExpedited = false;
}

function reschedule(ms?: number) {
if (stopped) return;
if (timerId) {
clearTimeout(timerId);
}

timerId = setTimeout(executeAndReschedule, ms || interval);
}

function executeAndReschedule() {
cannotBeExpedited = false;
lastCallTime = clock();

fn(err => {
if (err) throw err;
reschedule(interval);
});
}

if (immediate) {
executeAndReschedule();
} else {
lastCallTime = clock();
reschedule(undefined);
}

return { wake, stop };
}

/** @internal */
export function hasAtomicOperators(doc: Document | Document[]): boolean {
if (Array.isArray(doc)) {
Expand Down

0 comments on commit 9883993

Please sign in to comment.