Skip to content

Commit

Permalink
Exponential back-off for process restarting (#462)
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavohenke committed Jan 12, 2024
1 parent aedebc1 commit bb8436b
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 15 deletions.
7 changes: 4 additions & 3 deletions bin/concurrently.ts
Expand Up @@ -165,9 +165,9 @@ const args = yargs(argsBeforeSep)
type: 'number',
},
'restart-after': {
describe: 'Delay time to respawn the process, in milliseconds.',
describe: 'Delay before restarting the process, in milliseconds, or "exponential".',
default: defaults.restartDelay,
type: 'number',
type: 'string',
},

// Input
Expand Down Expand Up @@ -223,7 +223,8 @@ concurrently(
prefix: args.prefix,
prefixColors: args.prefixColors.split(','),
prefixLength: args.prefixLength,
restartDelay: args.restartAfter,
restartDelay:
args.restartAfter === 'exponential' ? 'exponential' : Number(args.restartAfter),
restartTries: args.restartTries,
successCondition: args.success,
timestampFormat: args.timestampFormat,
Expand Down
46 changes: 41 additions & 5 deletions src/flow-control/restart-process.spec.ts
@@ -1,5 +1,5 @@
import { createMockInstance } from 'jest-create-mock-instance';
import { TestScheduler } from 'rxjs/testing';
import { VirtualTimeScheduler } from 'rxjs';

import { createFakeCloseEvent, FakeCommand } from '../fixtures/fake-command';
import { Logger } from '../logger';
Expand All @@ -8,12 +8,14 @@ import { RestartProcess } from './restart-process';
let commands: FakeCommand[];
let controller: RestartProcess;
let logger: Logger;
let scheduler: TestScheduler;
let scheduler: VirtualTimeScheduler;
beforeEach(() => {
commands = [new FakeCommand(), new FakeCommand()];

logger = createMockInstance(Logger);
scheduler = new TestScheduler(() => true);

// Don't use TestScheduler as it's hardcoded to a max number of "frames" (time),
// which don't work for some tests in this suite
scheduler = new VirtualTimeScheduler();
controller = new RestartProcess({
logger,
scheduler,
Expand All @@ -34,14 +36,31 @@ it('does not restart processes that complete with success', () => {
expect(commands[1].start).toHaveBeenCalledTimes(0);
});

it('restarts processes that fail after delay has passed', () => {
it('restarts processes that fail immediately, if no delay was passed', () => {
controller = new RestartProcess({ logger, scheduler, tries: 1 });
controller.handle(commands);

commands[0].close.next(createFakeCloseEvent({ exitCode: 1 }));
scheduler.flush();

expect(scheduler.now()).toBe(0);
expect(logger.logCommandEvent).toHaveBeenCalledTimes(1);
expect(logger.logCommandEvent).toHaveBeenCalledWith(
`${commands[0].command} restarted`,
commands[0],
);
expect(commands[0].start).toHaveBeenCalledTimes(1);
});

it('restarts processes that fail after delay ms has passed', () => {
controller.handle(commands);

commands[0].close.next(createFakeCloseEvent({ exitCode: 1 }));
commands[1].close.next(createFakeCloseEvent({ exitCode: 0 }));

scheduler.flush();

expect(scheduler.now()).toBe(100);
expect(logger.logCommandEvent).toHaveBeenCalledTimes(1);
expect(logger.logCommandEvent).toHaveBeenCalledWith(
`${commands[0].command} restarted`,
Expand All @@ -51,6 +70,23 @@ it('restarts processes that fail after delay has passed', () => {
expect(commands[1].start).not.toHaveBeenCalled();
});

it('restarts processes that fail with an exponential back-off', () => {
const tries = 4;
controller = new RestartProcess({ logger, scheduler, tries, delay: 'exponential' });
controller.handle(commands);

let time = 0;
for (let i = 0; i < tries; i++) {
commands[0].close.next(createFakeCloseEvent({ exitCode: 1 }));
scheduler.flush();

time += Math.pow(2, i) * 1000;
expect(scheduler.now()).toBe(time);
expect(logger.logCommandEvent).toHaveBeenCalledTimes(i + 1);
expect(commands[0].start).toHaveBeenCalledTimes(i + 1);
}
});

it('restarts processes up to tries', () => {
controller.handle(commands);

Expand Down
18 changes: 13 additions & 5 deletions src/flow-control/restart-process.ts
@@ -1,18 +1,20 @@
import * as Rx from 'rxjs';
import { defaultIfEmpty, delay, filter, map, skip, take, takeWhile } from 'rxjs/operators';
import { defaultIfEmpty, delayWhen, filter, map, skip, take, takeWhile } from 'rxjs/operators';

import { Command } from '../command';
import * as defaults from '../defaults';
import { Logger } from '../logger';
import { FlowController } from './flow-controller';

export type RestartDelay = number | 'exponential';

/**
* Restarts commands that fail up to a defined number of times.
*/
export class RestartProcess implements FlowController {
private readonly logger: Logger;
private readonly scheduler?: Rx.SchedulerLike;
readonly delay: number;
private readonly delay: RestartDelay;
readonly tries: number;

constructor({
Expand All @@ -21,13 +23,13 @@ export class RestartProcess implements FlowController {
logger,
scheduler,
}: {
delay?: number;
delay?: RestartDelay;
tries?: number;
logger: Logger;
scheduler?: Rx.SchedulerLike;
}) {
this.logger = logger;
this.delay = delay != null ? +delay : defaults.restartDelay;
this.delay = delay ?? 0;
this.tries = tries != null ? +tries : defaults.restartTries;
this.tries = this.tries < 0 ? Infinity : this.tries;
this.scheduler = scheduler;
Expand All @@ -38,6 +40,12 @@ export class RestartProcess implements FlowController {
return { commands };
}

const delayOperator = delayWhen((_, index) => {
const { delay } = this;
const value = delay === 'exponential' ? Math.pow(2, index) * 1000 : delay;
return Rx.timer(value, this.scheduler);
});

commands
.map((command) =>
command.close.pipe(
Expand All @@ -50,7 +58,7 @@ export class RestartProcess implements FlowController {
// Delay the emission (so that the restarts happen on time),
// explicitly telling the subscriber that a restart is needed
failure.pipe(
delay(this.delay, this.scheduler),
delayOperator,
map(() => true),
),
// Skip the first N emissions (as these would be duplicates of the above),
Expand Down
4 changes: 2 additions & 2 deletions src/index.ts
Expand Up @@ -15,7 +15,7 @@ import { LogError } from './flow-control/log-error';
import { LogExit } from './flow-control/log-exit';
import { LogOutput } from './flow-control/log-output';
import { LogTimings } from './flow-control/log-timings';
import { RestartProcess } from './flow-control/restart-process';
import { RestartDelay, RestartProcess } from './flow-control/restart-process';
import { Logger } from './logger';

export type ConcurrentlyOptions = BaseConcurrentlyOptions & {
Expand Down Expand Up @@ -59,7 +59,7 @@ export type ConcurrentlyOptions = BaseConcurrentlyOptions & {
*
* @see RestartProcess
*/
restartDelay?: number;
restartDelay?: RestartDelay;

/**
* How many times commands should be restarted when they exit with a failure.
Expand Down

0 comments on commit bb8436b

Please sign in to comment.