From 58f8645a298b08b3bfcd5218f16defe1e132adb4 Mon Sep 17 00:00:00 2001 From: Christopher Hiller Date: Mon, 2 Mar 2020 17:02:30 -0800 Subject: [PATCH] WIP: concurrency based on worker threads; see #2839 [ci skip] --- .eslintrc.yml | 25 ++++---- lib/buffered-runner.js | 75 ++++++++++++++++++++++ lib/cli/run-helpers.js | 22 +++++++ lib/cli/run-option-metadata.js | 5 +- lib/cli/run.js | 13 ++++ lib/mocha.js | 11 +++- lib/reporters/buffered.js | 114 +++++++++++++++++++++++++++++++++ lib/worker.js | 39 +++++++++++ package.json | 1 + 9 files changed, 290 insertions(+), 15 deletions(-) create mode 100644 lib/buffered-runner.js create mode 100644 lib/reporters/buffered.js create mode 100644 lib/worker.js diff --git a/.eslintrc.yml b/.eslintrc.yml index 34c77cabe4..883d122818 100644 --- a/.eslintrc.yml +++ b/.eslintrc.yml @@ -16,17 +16,20 @@ rules: - safe overrides: - files: - - scripts/**/*.js - - package-scripts.js - - karma.conf.js - - .wallaby.js - - .eleventy.js - - bin/* - - lib/cli/**/*.js - - test/node-unit/**/*.js - - test/integration/options/watch.spec.js - - test/integration/helpers.js - - lib/growl.js + - 'scripts/**/*.js' + - 'package-scripts.js' + - 'karma.conf.js' + - '.wallaby.js' + - '.eleventy.js' + - 'bin/*' + - 'lib/cli/**/*.js' + - 'test/node-unit/**/*.js' + - 'test/integration/options/watch.spec.js' + - 'test/integration/helpers.js' + - 'lib/growl.js' + - 'lib/buffered-runner.js' + - 'lib/worker.js' + - 'lib/reporters/buffered.js' parserOptions: ecmaVersion: 2018 env: diff --git a/lib/buffered-runner.js b/lib/buffered-runner.js new file mode 100644 index 0000000000..7a01b90676 --- /dev/null +++ b/lib/buffered-runner.js @@ -0,0 +1,75 @@ +'use strict'; + +const Runner = require('./runner'); +const {EVENT_RUN_BEGIN, EVENT_RUN_END} = Runner.constants; +const {spawn, Pool, Worker} = require('threads'); +const debug = require('debug')('mocha:buffered-runner'); + +/** + * This `Runner` delegates tests runs to worker threads. Does not execute any + * {@link Runnable}s by itself! + */ +class BufferedRunner extends Runner { + /** + * Runs Mocha tests by creating a thread pool, then delegating work to the + * worker threads. Each worker receives one file, and as workers become + * available, they take a file from the queue and run it. + * The worker thread execution is treated like an RPC--it returns a `Promise` + * containing serialized information about the run. The information is processed + * as it's received, and emitted to a {@link Reporter}, which is likely listening + * for these events. + * + * @todo handle tests in a specific order, e.g., via `--file`? + * @todo handle delayed runs? + * @todo graceful failure + * @todo audit `BufferedEvent` objects; e.g. do tests need a `parent` prop? + * @todo should we just instantiate a `Test` object from the `BufferedEvent`? + * @param {Function} callback - Called with an exit code corresponding to + * number of test failures. + * @param {Object} options + * @param {string[]} options.files - List of test files + * @param {Options} option.opts - Command-line options + * @returns {Promise} + */ + async run(callback, {files, opts}) { + const pool = Pool(() => spawn(new Worker('./worker.js')), opts.jobs); + + let exitCode = 0; + + this.emit(EVENT_RUN_BEGIN); + + files.forEach(file => { + debug('enqueueing test file %s', file); + pool.queue(async run => { + const [failures, events] = await run(file, opts); + debug( + 'completed run of file %s; %d failures / %d events', + file, + failures, + events.length + ); + exitCode += failures; // can this be non-numeric? + events.forEach(({name, data}) => { + Object.keys(data).forEach(key => { + if (key.startsWith('__')) { + data[key.slice(2)] = () => data[key]; + } + }); + // maybe we should just expect `err` separately from the worker. + if (data.err) { + this.emit(name, data, data.err); + } else { + this.emit(name, data); + } + }); + }); + }); + + await pool.settled(); // nonzero exit code if rejection? + await pool.terminate(); + this.emit(EVENT_RUN_END); + callback(exitCode); + } +} + +module.exports = BufferedRunner; diff --git a/lib/cli/run-helpers.js b/lib/cli/run-helpers.js index 72823c48f6..25000e8f9b 100644 --- a/lib/cli/run-helpers.js +++ b/lib/cli/run-helpers.js @@ -108,6 +108,25 @@ const singleRun = async (mocha, {exit}, fileCollectParams) => { return mocha.run(exit ? exitMocha : exitMochaLater); }; +/** + * Collect files and run tests (using `BufferedRunner`) + * @param {Mocha} mocha - Mocha instance + * @param {Options} opts - Command line options + * @param {Object} fileCollectParams - Parameters that control test + * file collection. See `lib/cli/collect-files.js`. + * @returns {Promise} + * @private + */ +const parallelRun = async (mocha, opts, fileCollectParams) => { + const files = collectFiles(fileCollectParams); + const {jobs} = opts; + debug( + `executing ${files.length} test file(s) across ${jobs} concurrent jobs` + ); + + return mocha.run(opts.exit ? exitMocha : exitMochaLater, {files, opts}); +}; + /** * Actually run tests * @param {Mocha} mocha - Mocha instance @@ -122,6 +141,7 @@ exports.runMocha = async (mocha, options) => { exit = false, ignore = [], file = [], + parallel = false, recursive = false, sort = false, spec = [], @@ -140,6 +160,8 @@ exports.runMocha = async (mocha, options) => { if (watch) { watchRun(mocha, {watchFiles, watchIgnore}, fileCollectParams); + } else if (parallel) { + await parallelRun(mocha, options, fileCollectParams); } else { await singleRun(mocha, {exit}, fileCollectParams); } diff --git a/lib/cli/run-option-metadata.js b/lib/cli/run-option-metadata.js index 4648d9fbfe..da3b7d995d 100644 --- a/lib/cli/run-option-metadata.js +++ b/lib/cli/run-option-metadata.js @@ -42,11 +42,12 @@ exports.types = { 'list-interfaces', 'list-reporters', 'no-colors', + 'parallel', 'recursive', 'sort', 'watch' ], - number: ['retries'], + number: ['retries', 'jobs'], string: [ 'config', 'fgrep', @@ -75,7 +76,9 @@ exports.aliases = { growl: ['G'], ignore: ['exclude'], invert: ['i'], + jobs: ['j'], 'no-colors': ['C'], + parallel: ['p'], reporter: ['R'], 'reporter-option': ['reporter-options', 'O'], require: ['r'], diff --git a/lib/cli/run.js b/lib/cli/run.js index d024cbb0f2..1c52e7f634 100644 --- a/lib/cli/run.js +++ b/lib/cli/run.js @@ -24,6 +24,7 @@ const {ONE_AND_DONES, ONE_AND_DONE_ARGS} = require('./one-and-dones'); const debug = require('debug')('mocha:cli:run'); const defaults = require('../mocharc'); const {types, aliases} = require('./run-option-metadata'); +const coreCount = require('os').cpus().length; /** * Logical option groups @@ -150,6 +151,14 @@ exports.builder = yargs => description: 'Inverts --grep and --fgrep matches', group: GROUPS.FILTERS }, + jobs: { + description: 'Number of concurrent jobs', + implies: 'parallel', + defaultDescription: `CPU core count (${coreCount})`, + requiresArg: true, + group: GROUPS.RULES, + coerce: value => (typeof value === 'undefined' ? coreCount : value) + }, 'list-interfaces': { conflicts: Array.from(ONE_AND_DONE_ARGS), description: 'List built-in user interfaces & exit' @@ -169,6 +178,10 @@ exports.builder = yargs => normalize: true, requiresArg: true }, + parallel: { + description: 'Run tests in parallel', + group: GROUPS.RULES + }, recursive: { description: 'Look for tests in subdirectories', group: GROUPS.FILES diff --git a/lib/mocha.js b/lib/mocha.js index 017daa1e2c..e142ab1229 100644 --- a/lib/mocha.js +++ b/lib/mocha.js @@ -90,6 +90,7 @@ exports.Test = require('./test'); * @param {number} [options.slow] - Slow threshold value. * @param {number|string} [options.timeout] - Timeout threshold value. * @param {string} [options.ui] - Interface name. + * @param {boolean} [options.parallel] - Run jobs in parallel */ function Mocha(options) { options = utils.assign({}, mocharc, options || {}); @@ -136,6 +137,10 @@ function Mocha(options) { this[opt](); } }, this); + + this._runner = options.parallel + ? require('./buffered-runner') + : exports.Runner; } /** @@ -824,14 +829,14 @@ Object.defineProperty(Mocha.prototype, 'version', { * // exit with non-zero status if there were test failures * mocha.run(failures => process.exitCode = failures ? 1 : 0); */ -Mocha.prototype.run = function(fn) { +Mocha.prototype.run = function(fn, runOptions) { if (this.files.length && !this.loadAsync) { this.loadFiles(); } var suite = this.suite; var options = this.options; options.files = this.files; - var runner = new exports.Runner(suite, options.delay); + var runner = new this._runner(suite, options.delay); createStatsCollector(runner); var reporter = new this._reporter(runner, options); runner.checkLeaks = options.checkLeaks === true; @@ -864,5 +869,5 @@ Mocha.prototype.run = function(fn) { } } - return runner.run(done); + return runner.run(done, runOptions); }; diff --git a/lib/reporters/buffered.js b/lib/reporters/buffered.js new file mode 100644 index 0000000000..b13df9fa20 --- /dev/null +++ b/lib/reporters/buffered.js @@ -0,0 +1,114 @@ +'use strict'; +/** + * @module Buffered + */ +/** + * Module dependencies. + */ + +const { + EVENT_SUITE_BEGIN, + EVENT_SUITE_END, + EVENT_TEST_FAIL, + EVENT_TEST_PASS, + EVENT_TEST_PENDING +} = require('../runner').constants; + +/** + * Creates a {@link BufferedEvent} from a {@link Suite}. + * @param {string} evt - Event name + * @param {Suite} suite - Suite object + * @returns {BufferedEvent} + */ +const serializeSuite = (evt, suite) => ({ + name: evt, + data: {root: suite.root, title: suite.title} +}); + +/** + * Creates a {@link BufferedEvent} from a {@link Test}. + * @param {string} evt - Event name + * @param {Test} test - Test object + * @param {any} err - Error, if applicable + */ +const serializeTest = (evt, test, [err]) => { + const obj = { + title: test.title, + duration: test.duration, + err: test.err, + __fullTitle: test.fullTitle(), + __slow: test.slow(), + __titlePath: test.titlePath() + }; + if (err) { + obj.err = + test.err && err instanceof Error + ? { + multiple: [...(test.err.multiple || []), err] + } + : err; + } + return { + name: evt, + data: obj + }; +}; + +/** + * The `Buffered` reporter is for use by parallel runs. Instead of outputting + * to `STDOUT`, etc., it retains a list of events it receives and hands these + * off to the callback passed into {@link Mocha#run}. That callback will then + * return the data to the main process. + */ +class Buffered { + /** + * Listens for {@link Runner} events and retains them in an `events` instance prop. + * @param {Runner} runner + */ + constructor(runner) { + /** + * Retained list of events emitted from the {@link Runner} instance. + * @type {BufferedEvent[]} + */ + const events = (this.events = []); + + runner + .on(EVENT_SUITE_BEGIN, suite => { + events.push(serializeSuite(EVENT_SUITE_BEGIN, suite)); + }) + .on(EVENT_SUITE_END, suite => { + events.push(serializeSuite(EVENT_SUITE_END, suite)); + }) + .on(EVENT_TEST_PENDING, test => { + events.push(serializeTest(EVENT_TEST_PENDING, test)); + }) + .on(EVENT_TEST_FAIL, (test, err) => { + events.push(serializeTest(EVENT_TEST_FAIL, test, err)); + }) + .on(EVENT_TEST_PASS, test => { + events.push(serializeTest(EVENT_TEST_PASS, test)); + }); + } + + /** + * Calls the {@link Mocha#run} callback (`callback`) with the test failure + * count and the array of {@link BufferedEvent} objects. Resets the array. + * @param {number} failures - Number of failed tests + * @param {Function} callback - The callback passed to {@link Mocha#run}. + */ + done(failures, callback) { + callback(failures, [...this.events]); + this.events = []; + } +} + +/** + * Serializable event data from a `Runner`. Keys of the `data` property + * beginning with `__` will be converted into a function which returns the value + * upon deserialization. + * @typedef {Object} BufferedEvent + * @property {string} name - Event name + * @property {object} data - Event parameters + */ + +module.exports = Buffered; diff --git a/lib/worker.js b/lib/worker.js new file mode 100644 index 0000000000..84d80e9c31 --- /dev/null +++ b/lib/worker.js @@ -0,0 +1,39 @@ +'use strict'; + +const {expose} = require('threads/worker'); +const Mocha = require('./mocha'); +const {handleRequires, validatePlugin} = require('./cli/run-helpers'); + +let bootstrapped = false; + +/** + * Runs a single test file in a worker thread. + * @param {string} file - Filepath of test file + * @param {Options} argv - Parsed command-line options object + * @returns {Promise<[number, BufferedEvent[]]>} A tuple of failures and + * serializable event data + */ +async function run(file, argv) { + // the buffered reporter retains its events; these events are returned + // from this function back to the main process. + argv.reporter = require.resolve('./reporters/buffered'); + // if these were set, it would cause infinite recursion by spawning another worker + delete argv.parallel; + delete argv.jobs; + if (!bootstrapped) { + // setup requires and ui, but only do this once--we will reuse this worker! + handleRequires(argv.require); + validatePlugin(argv, 'ui', Mocha.interfaces); + bootstrapped = true; + } + const mocha = new Mocha(argv); + mocha.files = [file]; + await mocha.loadFilesAsync(); + return new Promise(resolve => { + mocha.run((failures, events) => { + resolve([failures, events]); + }); + }); +} + +expose(run); diff --git a/package.json b/package.json index d2a133a380..24be5c2067 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "object.assign": "4.1.0", "strip-json-comments": "3.0.1", "supports-color": "7.1.0", + "threads": "^1.3.0", "which": "2.0.2", "wide-align": "1.1.3", "yargs": "13.3.2",