Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort commands not running when max processes < N #460

Merged
merged 11 commits into from Mar 24, 2024
33 changes: 31 additions & 2 deletions src/completion-listener.spec.ts
Expand Up @@ -21,12 +21,41 @@ const createController = (successCondition?: SuccessCondition) =>
scheduler,
});

const emitFakeCloseEvent = (command: FakeCommand, event?: Partial<CloseEvent>) =>
command.close.next(createFakeCloseEvent({ ...event, command, index: command.index }));
const emitFakeCloseEvent = (command: FakeCommand, event?: Partial<CloseEvent>) => {
const fakeEvent = createFakeCloseEvent({ ...event, command, index: command.index });
command.state = 'exited';
command.close.next(fakeEvent);
return fakeEvent;
};

const flushPromises = () => new Promise((resolve) => setTimeout(resolve, 0));

describe('listen', () => {
it('completes only when commands emit a close event, returns close event', async () => {
const abortCtrl = new AbortController();
const result = createController('all').listen(commands, abortCtrl.signal);

commands[0].state = 'started';
abortCtrl.abort();

const event = emitFakeCloseEvent(commands[0]);
scheduler.flush();

await expect(result).resolves.toHaveLength(1);
await expect(result).resolves.toEqual([event]);
});

it('completes when abort signal is received and command is stopped, returns nothing', async () => {
const abortCtrl = new AbortController();
// Use success condition = first to test index access when there are no close events
const result = createController('first').listen([new FakeCommand()], abortCtrl.signal);

abortCtrl.abort();
scheduler.flush();

await expect(result).resolves.toHaveLength(0);
});

it('check for success once all commands have emitted at least a single close event', async () => {
const finallyCallback = jest.fn();
const result = createController().listen(commands).finally(finallyCallback);
Expand Down
57 changes: 43 additions & 14 deletions src/completion-listener.ts
@@ -1,5 +1,5 @@
import * as Rx from 'rxjs';
import { filter, map, switchMap, take } from 'rxjs/operators';
import { delay, filter, map, switchMap, take } from 'rxjs/operators';

import { CloseEvent, Command } from './command';

Expand Down Expand Up @@ -48,6 +48,11 @@ export class CompletionListener {
}

private isSuccess(events: CloseEvent[]) {
if (!events.length) {
// When every command was aborted, consider a success.
return true;
}

if (this.successCondition === 'first') {
return events[0].exitCode === 0;
} else if (this.successCondition === 'last') {
Expand All @@ -56,7 +61,7 @@ export class CompletionListener {

const commandSyntaxMatch = this.successCondition.match(/^!?command-(.+)$/);
if (commandSyntaxMatch == null) {
// If not a `command-` syntax, then it's an 'all' condition, or it's treated as such.
// If not a `command-` syntax, then it's an 'all' condition or it's treated as such.
return events.every(({ exitCode }) => exitCode === 0);
}

Expand All @@ -73,7 +78,7 @@ export class CompletionListener {
(event) => targetCommandsEvents.includes(event) || event.exitCode === 0,
);
}
// Only the specified commands must exit successfully
// Only the specified commands must exit succesfully
return (
targetCommandsEvents.length > 0 &&
targetCommandsEvents.every((event) => event.exitCode === 0)
Expand All @@ -84,23 +89,47 @@ export class CompletionListener {
* Given a list of commands, wait for all of them to exit and then evaluate their exit codes.
*
* @returns A Promise that resolves if the success condition is met, or rejects otherwise.
* In either case, the value is a list of close events for commands that spawned.
* Commands that didn't spawn are filtered out.
*/
listen(commands: Command[]): Promise<CloseEvent[]> {
const closeStreams = commands.map((command) => command.close);
listen(commands: Command[], abortSignal?: AbortSignal): Promise<CloseEvent[]> {
const abort =
abortSignal &&
Rx.fromEvent(abortSignal, 'abort', { once: true }).pipe(
// The abort signal must happen before commands are killed, otherwise new commands
// might spawn. Because of this, it's not be possible to capture the close events
// without an immediate delay
delay(0, this.scheduler),
map(() => undefined),
);

const closeStreams = commands.map((command) =>
abort
? // Commands that have been started must close.
Rx.race(command.close, abort.pipe(filter(() => command.state === 'stopped')))
: command.close,
);
return Rx.lastValueFrom(
Rx.combineLatest(closeStreams).pipe(
filter(() => commands.every((command) => command.state !== 'started')),
map((exitInfos) =>
exitInfos.sort(
(first, second) =>
first.timings.endDate.getTime() - second.timings.endDate.getTime(),
filter((events) =>
commands.every(
(command, i) => command.state !== 'started' || events[i] === undefined,
),
),
switchMap((exitInfos) =>
this.isSuccess(exitInfos)
? this.emitWithScheduler(Rx.of(exitInfos))
: this.emitWithScheduler(Rx.throwError(() => exitInfos)),
map((events) =>
events
// Filter out aborts, since they cannot be sorted and are considered success condition anyways
.filter((event): event is CloseEvent => event != null)
// Sort according to exit time
.sort(
(first, second) =>
first.timings.endDate.getTime() - second.timings.endDate.getTime(),
),
),
switchMap((events) =>
this.isSuccess(events)
? this.emitWithScheduler(Rx.of(events))
: this.emitWithScheduler(Rx.throwError(() => events)),
),
take(1),
),
Expand Down
10 changes: 10 additions & 0 deletions src/concurrently.spec.ts
Expand Up @@ -111,6 +111,16 @@ it('spawns commands up to percent based limit at once', () => {
expect(spawn).toHaveBeenCalledWith('qux', expect.objectContaining({}));
});

it('does not spawn further commands on abort signal aborted', () => {
const abortController = new AbortController();
create(['foo', 'bar'], { maxProcesses: 1, abortSignal: abortController.signal });
expect(spawn).toHaveBeenCalledTimes(1);

abortController.abort();
processes[0].emit('close', 0, null);
expect(spawn).toHaveBeenCalledTimes(1);
});

it('runs controllers with the commands', () => {
create(['echo', '"echo wrapped"']);

Expand Down
18 changes: 12 additions & 6 deletions src/concurrently.ts
Expand Up @@ -43,7 +43,8 @@ export type ConcurrentlyResult = {
* A promise that resolves when concurrently ran successfully according to the specified
* success condition, or reject otherwise.
*
* Both the resolved and rejected value is the list of all command's close events.
* Both the resolved and rejected value is a list of all the close events for commands that
* spawned; commands that didn't spawn are filtered out.
*/
result: Promise<CloseEvent[]>;
};
Expand Down Expand Up @@ -105,6 +106,11 @@ export type ConcurrentlyOptions = {
*/
successCondition?: SuccessCondition;

/**
* A signal to stop spawning further processes.
*/
abortSignal?: AbortSignal;

/**
* Which flow controllers should be applied on commands spawned by concurrently.
* Defaults to an empty array.
Expand Down Expand Up @@ -217,11 +223,11 @@ export function concurrently(
: Number(options.maxProcesses)) || commandsLeft.length,
);
for (let i = 0; i < maxProcesses; i++) {
maybeRunMore(commandsLeft);
maybeRunMore(commandsLeft, options.abortSignal);
}

const result = new CompletionListener({ successCondition: options.successCondition })
.listen(commands)
.listen(commands, options.abortSignal)
.finally(() => {
handleResult.onFinishCallbacks.forEach((onFinish) => onFinish());
});
Expand Down Expand Up @@ -263,14 +269,14 @@ function parseCommand(command: CommandInfo, parsers: CommandParser[]) {
);
}

function maybeRunMore(commandsLeft: Command[]) {
function maybeRunMore(commandsLeft: Command[], abortSignal?: AbortSignal) {
const command = commandsLeft.shift();
if (!command) {
if (!command || abortSignal?.aborted) {
return;
}

command.start();
command.close.subscribe(() => {
maybeRunMore(commandsLeft);
maybeRunMore(commandsLeft, abortSignal);
});
}
39 changes: 16 additions & 23 deletions src/flow-control/kill-on-signal.spec.ts
Expand Up @@ -7,10 +7,12 @@ import { KillOnSignal } from './kill-on-signal';
let commands: Command[];
let controller: KillOnSignal;
let process: EventEmitter;
let abortController: AbortController;
beforeEach(() => {
process = new EventEmitter();
commands = [new FakeCommand(), new FakeCommand()];
controller = new KillOnSignal({ process });
abortController = new AbortController();
controller = new KillOnSignal({ process, abortController });
});

it('returns commands that keep non-close streams from original commands', () => {
Expand Down Expand Up @@ -51,29 +53,20 @@ it('returns commands that keep non-SIGINT exit codes', () => {
expect(callback).toHaveBeenCalledWith(expect.objectContaining({ exitCode: 1 }));
});

it('kills all commands on SIGINT', () => {
controller.handle(commands);
process.emit('SIGINT');

expect(process.listenerCount('SIGINT')).toBe(1);
expect(commands[0].kill).toHaveBeenCalledWith('SIGINT');
expect(commands[1].kill).toHaveBeenCalledWith('SIGINT');
});

it('kills all commands on SIGTERM', () => {
controller.handle(commands);
process.emit('SIGTERM');
describe.each(['SIGINT', 'SIGTERM', 'SIGHUP'])('on %s', (signal) => {
it('kills all commands', () => {
controller.handle(commands);
process.emit(signal);

expect(process.listenerCount('SIGTERM')).toBe(1);
expect(commands[0].kill).toHaveBeenCalledWith('SIGTERM');
expect(commands[1].kill).toHaveBeenCalledWith('SIGTERM');
});
expect(process.listenerCount(signal)).toBe(1);
expect(commands[0].kill).toHaveBeenCalledWith(signal);
expect(commands[1].kill).toHaveBeenCalledWith(signal);
});

it('kills all commands on SIGHUP', () => {
controller.handle(commands);
process.emit('SIGHUP');
it('sends abort signal', () => {
controller.handle(commands);
process.emit(signal);

expect(process.listenerCount('SIGHUP')).toBe(1);
expect(commands[0].kill).toHaveBeenCalledWith('SIGHUP');
expect(commands[1].kill).toHaveBeenCalledWith('SIGHUP');
expect(abortController.signal.aborted).toBe(true);
});
});
11 changes: 10 additions & 1 deletion src/flow-control/kill-on-signal.ts
Expand Up @@ -10,16 +10,25 @@ import { FlowController } from './flow-controller';
*/
export class KillOnSignal implements FlowController {
private readonly process: EventEmitter;
private readonly abortController?: AbortController;

constructor({ process }: { process: EventEmitter }) {
constructor({
process,
abortController,
}: {
process: EventEmitter;
abortController?: AbortController;
}) {
this.process = process;
this.abortController = abortController;
}

handle(commands: Command[]) {
let caughtSignal: NodeJS.Signals;
(['SIGINT', 'SIGTERM', 'SIGHUP'] as NodeJS.Signals[]).forEach((signal) => {
this.process.on(signal, () => {
caughtSignal = signal;
this.abortController?.abort();
commands.forEach((command) => command.kill(signal));
});
});
Expand Down