forked from elastic/kibana
-
Notifications
You must be signed in to change notification settings - Fork 0
/
proc.ts
170 lines (146 loc) · 4.59 KB
/
proc.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import execa from 'execa';
import { statSync } from 'fs';
import * as Rx from 'rxjs';
import { tap, share, take, mergeMap, map, ignoreElements } from 'rxjs/operators';
import chalk from 'chalk';
import treeKill from 'tree-kill';
import { promisify } from 'util';
const treeKillAsync = promisify((...args: [number, string, any]) => treeKill(...args));
import { ToolingLog } from '../tooling_log';
import { observeLines } from '../stdio';
import { createCliError } from './errors';
const SECOND = 1000;
const STOP_TIMEOUT = 30 * SECOND;
export interface ProcOptions {
cmd: string;
args: string[];
cwd: string;
env?: Record<string, string | undefined>;
stdin?: string;
}
async function withTimeout(
attempt: () => Promise<void>,
ms: number,
onTimeout: () => Promise<void>
) {
const TIMEOUT = Symbol('timeout');
try {
await Promise.race([
attempt(),
new Promise((_, reject) => setTimeout(() => reject(TIMEOUT), ms)),
]);
} catch (error) {
if (error === TIMEOUT) {
await onTimeout();
} else {
throw error;
}
}
}
export type Proc = ReturnType<typeof startProc>;
export function startProc(name: string, options: ProcOptions, log: ToolingLog) {
const { cmd, args, cwd, env, stdin } = options;
log.info('[%s] > %s', name, cmd, args.join(' '));
// spawn fails with ENOENT when either the
// cmd or cwd don't exist, so we check for the cwd
// ahead of time so that the error is less ambiguous
try {
if (!statSync(cwd).isDirectory()) {
throw new Error(`cwd "${cwd}" exists but is not a directory`);
}
} catch (err) {
if (err.code === 'ENOENT') {
throw new Error(`cwd "${cwd}" does not exist`);
}
}
const childProcess = execa(cmd, args, {
cwd,
env,
stdio: ['pipe', 'pipe', 'pipe'],
preferLocal: true,
});
if (stdin) {
childProcess.stdin!.end(stdin, 'utf8'); // TypeScript note: As long as the proc stdio[1] is 'pipe', then stdin will not be null
} else {
childProcess.stdin!.end(); // TypeScript note: As long as the proc stdio[1] is 'pipe', then stdin will not be null
}
let stopCalled = false;
const outcome$: Rx.Observable<number | null> = Rx.race(
// observe first exit event
Rx.fromEvent<[number]>(childProcess, 'exit').pipe(
take(1),
map(([code]) => {
if (stopCalled) {
return null;
}
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat then as errors
if (code > 0 && !(code === 143 || code === 130)) {
throw createCliError(`[${name}] exited with code ${code}`);
}
return code;
})
),
// observe first error event
Rx.fromEvent(childProcess, 'error').pipe(
take(1),
mergeMap((err) => Rx.throwError(err))
)
).pipe(share());
const lines$ = Rx.merge(
observeLines(childProcess.stdout!), // TypeScript note: As long as the proc stdio[1] is 'pipe', then stdout will not be null
observeLines(childProcess.stderr!) // TypeScript note: As long as the proc stdio[1] is 'pipe', then stderr will not be null
).pipe(
tap((line) => log.write(` ${chalk.gray('proc')} [${chalk.gray(name)}] ${line}`)),
share()
);
const outcomePromise = Rx.merge(lines$.pipe(ignoreElements()), outcome$).toPromise();
async function stop(signal: NodeJS.Signals) {
if (stopCalled) {
return;
}
stopCalled = true;
await withTimeout(
async () => {
log.debug(`Sending "${signal}" to proc "${name}"`);
await treeKillAsync(childProcess.pid!, signal);
await outcomePromise;
},
STOP_TIMEOUT,
async () => {
log.warning(
`Proc "${name}" was sent "${signal}" didn't emit the "exit" or "error" events after ${STOP_TIMEOUT} ms, sending SIGKILL`
);
await treeKillAsync(childProcess.pid!, 'SIGKILL');
}
);
await withTimeout(
async () => {
try {
await outcomePromise;
} catch (error) {
// ignore
}
},
STOP_TIMEOUT,
async () => {
throw new Error(
`Proc "${name}" was stopped but never emitted either the "exit" or "error" event after ${STOP_TIMEOUT} ms`
);
}
);
}
return {
name,
lines$,
outcome$,
outcomePromise,
stop,
};
}