Skip to content

Commit

Permalink
Run test files in worker threads
Browse files Browse the repository at this point in the history
Add `--worker-threads` to use worker_threads to run tests. By default
the option is enabled.

Co-authored-by: Mark Wubben <mark@novemberborn.net>
  • Loading branch information
dnlup and novemberborn committed Mar 7, 2021
1 parent 5eea608 commit 0e5cc7d
Show file tree
Hide file tree
Showing 26 changed files with 767 additions and 609 deletions.
6 changes: 4 additions & 2 deletions docs/01-writing-tests.md
Expand Up @@ -8,9 +8,11 @@ You must define all tests synchronously. They can't be defined inside `setTimeou

AVA tries to run test files with their current working directory set to the directory that contains your `package.json` file.

## Process isolation
## Test isolation

Each test file is run in a separate Node.js process. This allows you to change the global state or overriding a built-in in one test file, without affecting another. It's also great for performance on modern multi-core processors, allowing multiple test files to execute in parallel.
AVA 3 runs each test file in a separate Node.js process. This allows you to change the global state or overriding a built-in in one test file, without affecting another.

AVA 4 runs each test file in a new worker thread, though you can fall back to AVA 3's behavior of running in separate processes.

AVA will set `process.env.NODE_ENV` to `test`, unless the `NODE_ENV` environment variable has been set. This is useful if the code you're testing has test defaults (for example when picking what database to connect to). It may cause your code or its dependencies to behave differently though. Note that `'NODE_ENV' in process.env` will always be `true`.

Expand Down
1 change: 1 addition & 0 deletions docs/05-command-line.md
Expand Up @@ -27,6 +27,7 @@ Options:
--help Show help [boolean]
--concurrency, -c Max number of test files running at the same time
(default: CPU cores) [number]
--no-worker-threads Don't use worker threads [boolean] (AVA 4 only)
--fail-fast Stop after first test failure [boolean]
--match, -m Only run tests with matching title (can be repeated)
[string]
Expand Down
1 change: 1 addition & 0 deletions docs/06-configuration.md
Expand Up @@ -47,6 +47,7 @@ Arguments passed to the CLI will always take precedence over the CLI options con
- `match`: not typically useful in the `package.json` configuration, but equivalent to [specifying `--match` on the CLI](./05-command-line.md#running-tests-with-matching-titles)
- `cache`: cache compiled files under `node_modules/.cache/ava`. If `false`, files are cached in a temporary directory instead
- `concurrency`: max number of test files running at the same time (default: CPU cores)
- `workerThreads`: use worker threads to run tests (requires AVA 4, enabled by default). If `false`, tests will run in child processes (how AVA 3 behaves)
- `failFast`: stop running further tests once a test fails
- `failWithoutAssertions`: if `false`, does not fail a test if it doesn't run [assertions](./03-assertions.md)
- `environmentVariables`: specifies environment variables to be made available to the tests. The environment variables defined here override the ones from `process.env`
Expand Down
4 changes: 3 additions & 1 deletion lib/api.js
Expand Up @@ -212,8 +212,10 @@ class Api extends Emittery {
}

const lineNumbers = getApplicableLineNumbers(globs.normalizeFileForMatching(apiOptions.projectDir, file), filter);
// Removing `providers` field because they cannot be transfered to the worker threads.
const {providers, ...forkOptions} = apiOptions;
const options = {
...apiOptions,
...forkOptions,
providerStates,
lineNumbers,
recordNewSnapshots: !isCi,
Expand Down
12 changes: 12 additions & 0 deletions lib/cli.js
Expand Up @@ -35,6 +35,11 @@ const FLAGS = {
description: 'Only run tests with matching title (can be repeated)',
type: 'string'
},
'no-worker-threads': {
coerce: coerceLastValue,
description: 'Don\'t use worker threads',
type: 'boolean'
},
'node-arguments': {
coerce: coerceLastValue,
description: 'Additional Node.js arguments for launching worker processes (specify as a single string)',
Expand Down Expand Up @@ -184,7 +189,13 @@ exports.run = async () => { // eslint-disable-line complexity
.help();

const combined = {...conf};

for (const flag of Object.keys(FLAGS)) {
if (flag === 'no-worker-threads' && Reflect.has(argv, 'worker-threads')) {
combined.workerThreads = argv['worker-threads'];
continue;
}

if (Reflect.has(argv, flag)) {
if (flag === 'fail-fast') {
combined.failFast = argv[flag];
Expand Down Expand Up @@ -387,6 +398,7 @@ exports.run = async () => { // eslint-disable-line complexity
cacheEnabled: combined.cache !== false,
chalkOptions,
concurrency: combined.concurrency || 0,
workerThreads: combined.workerThreads !== false,
debug,
environmentVariables,
experiments,
Expand Down
72 changes: 55 additions & 17 deletions lib/fork.js
@@ -1,4 +1,5 @@
'use strict';
const {Worker} = require('worker_threads');
const childProcess = require('child_process');
const path = require('path');
const fs = require('fs');
Expand All @@ -12,7 +13,7 @@ if (fs.realpathSync(__filename) !== __filename) {
// In case the test file imports a different AVA install,
// the presence of this variable allows it to require this one instead
const AVA_PATH = path.resolve(__dirname, '..');
const WORKER_PATH = require.resolve('./worker/subprocess');
const WORKER_PATH = require.resolve('./worker/base.js');

class SharedWorkerChannel extends Emittery {
constructor({channelId, filename, initialData}, sendToFork) {
Expand Down Expand Up @@ -59,7 +60,51 @@ class SharedWorkerChannel extends Emittery {

let forkCounter = 0;

const createWorker = (options, execArgv) => {
let worker;
let postMessage;
let close;
if (options.workerThreads) {
worker = new Worker(WORKER_PATH, {
argv: options.workerArgv,
env: {NODE_ENV: 'test', ...process.env, ...options.environmentVariables, AVA_PATH},
execArgv,
workerData: {
options
},
trackUnmanagedFds: true,
stdin: true,
stdout: true,
stderr: true
});
postMessage = worker.postMessage.bind(worker);
close = async () => {
try {
await worker.terminate();
} finally {
// No-op
}
};
} else {
worker = childProcess.fork(WORKER_PATH, options.workerArgv, {
cwd: options.projectDir,
silent: true,
env: {NODE_ENV: 'test', ...process.env, ...options.environmentVariables, AVA_PATH},
execArgv
});
postMessage = controlFlow(worker);
close = async () => worker.kill();
}

return {
worker,
postMessage,
close
};
};

module.exports = (file, options, execArgv = process.execArgv) => {
// TODO: this can be changed to use `threadId` when using worker_threads
const forkId = `fork/${++forkCounter}`;
const sharedWorkerChannels = new Map();

Expand All @@ -79,27 +124,19 @@ module.exports = (file, options, execArgv = process.execArgv) => {
...options
};

const subprocess = childProcess.fork(WORKER_PATH, options.workerArgv, {
cwd: options.projectDir,
silent: true,
env: {NODE_ENV: 'test', ...process.env, ...options.environmentVariables, AVA_PATH},
execArgv
});

subprocess.stdout.on('data', chunk => {
const {worker, postMessage, close} = createWorker(options, execArgv);
worker.stdout.on('data', chunk => {
emitStateChange({type: 'worker-stdout', chunk});
});

subprocess.stderr.on('data', chunk => {
worker.stderr.on('data', chunk => {
emitStateChange({type: 'worker-stderr', chunk});
});

const bufferedSend = controlFlow(subprocess);

let forcedExit = false;
const send = evt => {
if (!finished && !forcedExit) {
bufferedSend({ava: evt});
postMessage({ava: evt});
}
};

Expand All @@ -109,7 +146,7 @@ module.exports = (file, options, execArgv = process.execArgv) => {
resolve();
};

subprocess.on('message', message => {
worker.on('message', message => {
if (!message.ava) {
return;
}
Expand All @@ -118,6 +155,7 @@ module.exports = (file, options, execArgv = process.execArgv) => {
case 'ready-for-options':
send({type: 'options', options});
break;

case 'shared-worker-connect': {
const channel = new SharedWorkerChannel(message.ava, send);
sharedWorkerChannels.set(channel.id, channel);
Expand All @@ -136,12 +174,12 @@ module.exports = (file, options, execArgv = process.execArgv) => {
}
});

subprocess.on('error', error => {
worker.on('error', error => {
emitStateChange({type: 'worker-failed', err: error});
finish();
});

subprocess.on('exit', (code, signal) => {
worker.on('exit', (code, signal) => {
if (forcedExit) {
emitStateChange({type: 'worker-finished', forcedExit});
} else if (code > 0) {
Expand All @@ -163,7 +201,7 @@ module.exports = (file, options, execArgv = process.execArgv) => {

exit() {
forcedExit = true;
subprocess.kill();
close();
},

notifyOfPeerFailure() {
Expand Down
74 changes: 50 additions & 24 deletions lib/worker/subprocess.js → lib/worker/base.js
@@ -1,13 +1,27 @@
'use strict';
const {pathToFileURL} = require('url');
const path = require('path');
const currentlyUnhandled = require('currently-unhandled')();
const {isRunningInThread, isRunningInChildProcess} = require('./utils');

require('./ensure-forked'); // eslint-disable-line import/no-unassigned-import
// Check if the test is being run without AVA cli
if (!isRunningInChildProcess && !isRunningInThread) {
const chalk = require('chalk'); // Use default Chalk instance.
if (process.argv[1]) {
const fp = path.relative('.', process.argv[1]);

const ipc = require('./ipc');
console.log();
console.error(`Test files must be run with the AVA CLI:\n\n ${chalk.grey.dim('$')} ${chalk.cyan('ava ' + fp)}\n`);

ipc.send({type: 'ready-for-options'});
ipc.options.then(async options => {
process.exit(1);
} else {
throw new Error('The ’ava’ module can only be imported in test files');
}
}

const channel = require('./channel');

const run = async options => {
require('./options').set(options);
require('../chalk').set(options.chalkOptions);

Expand All @@ -31,8 +45,8 @@ ipc.options.then(async options => {
}

dependencyTracking.flush();
await ipc.flush();
process.exit(); // eslint-disable-line unicorn/no-process-exit
await channel.flush();
process.exit();
}

// TODO: Initialize providers here, then pass to lineNumberSelection() so they
Expand All @@ -44,7 +58,7 @@ ipc.options.then(async options => {
lineNumbers: options.lineNumbers
});
} catch (error) {
ipc.send({type: 'line-number-selection-error', err: serializeError('Line number selection error', false, error, options.file)});
channel.send({type: 'line-number-selection-error', err: serializeError('Line number selection error', false, error, options.file)});
checkSelectedByLineNumbers = () => false;
}

Expand All @@ -63,7 +77,7 @@ ipc.options.then(async options => {
updateSnapshots: options.updateSnapshots
});

ipc.peerFailed.then(() => { // eslint-disable-line promise/prefer-await-to-then
channel.peerFailed.then(() => { // eslint-disable-line promise/prefer-await-to-then
runner.interrupt();
});

Expand All @@ -75,31 +89,31 @@ ipc.options.then(async options => {
});

runner.on('dependency', dependencyTracking.track);
runner.on('stateChange', state => ipc.send(state));
runner.on('stateChange', state => channel.send(state));

runner.on('error', error => {
ipc.send({type: 'internal-error', err: serializeError('Internal runner error', false, error, runner.file)});
channel.send({type: 'internal-error', err: serializeError('Internal runner error', false, error, runner.file)});
exit(1);
});

runner.on('finish', async () => {
try {
const {cannotSave, touchedFiles} = runner.saveSnapshotState();
if (cannotSave) {
ipc.send({type: 'snapshot-error'});
channel.send({type: 'snapshot-error'});
} else if (touchedFiles) {
ipc.send({type: 'touched-files', files: touchedFiles});
channel.send({type: 'touched-files', files: touchedFiles});
}
} catch (error) {
ipc.send({type: 'internal-error', err: serializeError('Internal runner error', false, error, runner.file)});
channel.send({type: 'internal-error', err: serializeError('Internal runner error', false, error, runner.file)});
exit(1);
return;
}

try {
await Promise.all(sharedWorkerTeardowns.map(fn => fn()));
} catch (error) {
ipc.send({type: 'uncaught-exception', err: serializeError('Shared worker teardown error', false, error, runner.file)});
channel.send({type: 'uncaught-exception', err: serializeError('Shared worker teardown error', false, error, runner.file)});
exit(1);
return;
}
Expand All @@ -108,7 +122,7 @@ ipc.options.then(async options => {
currentlyUnhandled()
.filter(rejection => !attributedRejections.has(rejection.promise))
.forEach(rejection => {
ipc.send({type: 'unhandled-rejection', err: serializeError('Unhandled rejection', true, rejection.reason, runner.file)});
channel.send({type: 'unhandled-rejection', err: serializeError('Unhandled rejection', true, rejection.reason, runner.file)});
});

exit(0);
Expand All @@ -120,7 +134,7 @@ ipc.options.then(async options => {
return;
}

ipc.send({type: 'uncaught-exception', err: serializeError('Uncaught exception', true, error, runner.file)});
channel.send({type: 'uncaught-exception', err: serializeError('Uncaught exception', true, error, runner.file)});
exit(1);
});

Expand All @@ -131,7 +145,7 @@ ipc.options.then(async options => {
};

exports.registerSharedWorker = (filename, initialData, teardown) => {
const {channel, forceUnref, ready} = ipc.registerSharedWorker(filename, initialData);
const {channel: sharedWorkerChannel, forceUnref, ready} = channel.registerSharedWorker(filename, initialData);
runner.waitForReady.push(ready);
sharedWorkerTeardowns.push(async () => {
try {
Expand All @@ -140,7 +154,7 @@ ipc.options.then(async options => {
forceUnref();
}
});
return channel;
return sharedWorkerChannel;
};

// Store value to prevent required modules from modifying it.
Expand Down Expand Up @@ -215,23 +229,35 @@ ipc.options.then(async options => {
await load(testPath);

if (accessedRunner) {
// Unreference the IPC channel if the test file required AVA. This stops it
// Unreference the channel if the test file required AVA. This stops it
// from keeping the event loop busy, which means the `beforeExit` event can be
// used to detect when tests stall.
ipc.unref();
channel.unref();
} else {
ipc.send({type: 'missing-ava-import'});
channel.send({type: 'missing-ava-import'});
exit(1);
}
} catch (error) {
ipc.send({type: 'uncaught-exception', err: serializeError('Uncaught exception', true, error, runner.file)});
channel.send({type: 'uncaught-exception', err: serializeError('Uncaught exception', true, error, runner.file)});
exit(1);
}
}).catch(error => {
};

const onError = error => {
// There shouldn't be any errors, but if there are we may not have managed
// to bootstrap enough code to serialize them. Re-throw and let the process
// crash.
setImmediate(() => {
throw error;
});
});
};

if (isRunningInThread) {
const {workerData} = require('worker_threads');
const {options} = workerData;
delete workerData.options; // Don't allow user code access.
run(options).catch(onError);
} else if (isRunningInChildProcess) {
channel.send({type: 'ready-for-options'});
channel.options.then(run).catch(onError); // eslint-disable-line promise/prefer-await-to-then
}

0 comments on commit 0e5cc7d

Please sign in to comment.