Skip to content

Commit

Permalink
feat(terser): Update WorkerPool to reuse Workers (rollup#1409)
Browse files Browse the repository at this point in the history
* Update WorkerPool to reuse Workers

* test number of workers used

* Address feedback

* Fix ESLint warnings

* Use regular `for` loop

* Address feedback
  • Loading branch information
dasa committed Jan 23, 2023
1 parent e75744b commit 74dbb42
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 76 deletions.
2 changes: 2 additions & 0 deletions packages/terser/src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const taskInfo = Symbol('taskInfo');
export const freeWorker = Symbol('freeWorker');
27 changes: 23 additions & 4 deletions packages/terser/src/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@ import { WorkerPool } from './worker-pool';
export default function terser(input: Options = {}) {
const { maxWorkers, ...options } = input;

const workerPool = new WorkerPool({
filePath: fileURLToPath(import.meta.url),
maxWorkers
});
let workerPool: WorkerPool | null | undefined;
let numOfChunks = 0;
let numOfWorkersUsed = 0;

return {
name: 'terser',

async renderChunk(code: string, chunk: RenderedChunk, outputOptions: NormalizedOutputOptions) {
if (!workerPool) {
workerPool = new WorkerPool({
filePath: fileURLToPath(import.meta.url),
maxWorkers
});
}

numOfChunks += 1;

const defaultOptions: Options = {
sourceMap: outputOptions.sourcemap === true || typeof outputOptions.sourcemap === 'string'
};
Expand Down Expand Up @@ -80,7 +88,18 @@ export default function terser(input: Options = {}) {
return result;
} catch (e) {
return Promise.reject(e);
} finally {
numOfChunks -= 1;
if (numOfChunks === 0) {
numOfWorkersUsed = workerPool.numWorkers;
workerPool.close();
workerPool = null;
}
}
},

get numOfWorkersUsed() {
return numOfWorkersUsed;
}
};
}
11 changes: 11 additions & 0 deletions packages/terser/src/type.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import type { AsyncResource } from 'async_hooks';
import type { Worker } from 'worker_threads';

import type { MinifyOptions } from 'terser';

import type { taskInfo } from './constants';

export interface Options extends MinifyOptions {
nameCache?: Record<string, any>;
maxWorkers?: number;
Expand All @@ -12,6 +17,12 @@ export interface WorkerContext {

export type WorkerCallback = (err: Error | null, output?: WorkerOutput) => void;

interface WorkerPoolTaskInfo extends AsyncResource {
done(err: Error | null, result: any): void;
}

export type WorkerWithTaskInfo = Worker & { [taskInfo]?: WorkerPoolTaskInfo | null };

export interface WorkerContextSerialized {
code: string;
options: string;
Expand Down
119 changes: 64 additions & 55 deletions packages/terser/src/worker-pool.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
import { AsyncResource } from 'async_hooks';
import { Worker } from 'worker_threads';
import { cpus } from 'os';
import { EventEmitter } from 'events';

import serializeJavascript from 'serialize-javascript';

import { freeWorker, taskInfo } from './constants';

import type {
WorkerCallback,
WorkerContext,
WorkerOutput,
WorkerPoolOptions,
WorkerPoolTask
WorkerPoolTask,
WorkerWithTaskInfo
} from './type';

const symbol = Symbol.for('FreeWoker');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(private callback: WorkerCallback) {
super('WorkerPoolTaskInfo');
}

done(err: Error | null, result: any) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy();
}
}

export class WorkerPool extends EventEmitter {
protected maxInstances: number;
Expand All @@ -21,37 +34,30 @@ export class WorkerPool extends EventEmitter {

protected tasks: WorkerPoolTask[] = [];

protected workers = 0;
protected workers: WorkerWithTaskInfo[] = [];
protected freeWorkers: WorkerWithTaskInfo[] = [];

constructor(options: WorkerPoolOptions) {
super();

this.maxInstances = options.maxWorkers || cpus().length;
this.filePath = options.filePath;

this.on(symbol, () => {
this.on(freeWorker, () => {
if (this.tasks.length > 0) {
this.run();
const { context, cb } = this.tasks.shift()!;
this.runTask(context, cb);
}
});
}

add(context: WorkerContext, cb: WorkerCallback) {
this.tasks.push({
context,
cb
});

if (this.workers >= this.maxInstances) {
return;
}

this.run();
get numWorkers(): number {
return this.workers.length;
}

async addAsync(context: WorkerContext): Promise<WorkerOutput> {
addAsync(context: WorkerContext): Promise<WorkerOutput> {
return new Promise((resolve, reject) => {
this.add(context, (err, output) => {
this.runTask(context, (err, output) => {
if (err) {
reject(err);
return;
Expand All @@ -67,51 +73,54 @@ export class WorkerPool extends EventEmitter {
});
}

private run() {
if (this.tasks.length === 0) {
return;
}

const task = this.tasks.shift();

if (typeof task === 'undefined') {
return;
close() {
for (let i = 0; i < this.workers.length; i++) {
const worker = this.workers[i];
worker.terminate();
}
}

this.workers += 1;

let called = false;
const callCallback = (err: Error | null, output?: WorkerOutput) => {
if (called) {
return;
}
called = true;

this.workers -= 1;

task.cb(err, output);
this.emit(symbol);
};

const worker = new Worker(this.filePath, {
workerData: {
code: task.context.code,
options: serializeJavascript(task.context.options)
}
});
private addNewWorker() {
const worker: WorkerWithTaskInfo = new Worker(this.filePath);

worker.on('message', (data) => {
callCallback(null, data);
worker.on('message', (result) => {
worker[taskInfo]?.done(null, result);
worker[taskInfo] = null;
this.freeWorkers.push(worker);
this.emit(freeWorker);
});

worker.on('error', (err) => {
callCallback(err);
if (worker[taskInfo]) {
worker[taskInfo].done(err, null);
} else {
this.emit('error', err);
}
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});

worker.on('exit', (code) => {
if (code !== 0) {
callCallback(new Error(`Minify worker stopped with exit code ${code}`));
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(freeWorker);
}

private runTask(context: WorkerContext, cb: WorkerCallback) {
if (this.freeWorkers.length === 0) {
this.tasks.push({ context, cb });
if (this.numWorkers < this.maxInstances) {
this.addNewWorker();
}
});
return;
}

const worker = this.freeWorkers.pop();
if (worker) {
worker[taskInfo] = new WorkerPoolTaskInfo(cb);
worker.postMessage({
code: context.code,
options: serializeJavascript(context.options)
});
}
}
}
29 changes: 15 additions & 14 deletions packages/terser/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import process from 'process';
import { isMainThread, parentPort, workerData } from 'worker_threads';
import { isMainThread, parentPort } from 'worker_threads';

import { hasOwnProperty, isObject } from 'smob';

Expand All @@ -22,21 +21,25 @@ function isWorkerContextSerialized(input: unknown): input is WorkerContextSerial
);
}

export async function runWorker() {
if (isMainThread || !parentPort || !isWorkerContextSerialized(workerData)) {
export function runWorker() {
if (isMainThread || !parentPort) {
return;
}

try {
// eslint-disable-next-line no-eval
const eval2 = eval;
// eslint-disable-next-line no-eval
const eval2 = eval;

const options = eval2(`(${workerData.options})`);
parentPort.on('message', async (data: WorkerContextSerialized) => {
if (!isWorkerContextSerialized(data)) {
return;
}

const options = eval2(`(${data.options})`);

const result = await minify(workerData.code, options);
const result = await minify(data.code, options);

const output: WorkerOutput = {
code: result.code || workerData.code,
code: result.code || data.code,
nameCache: options.nameCache
};

Expand All @@ -48,8 +51,6 @@ export async function runWorker() {
output.sourceMap = result.map;
}

parentPort.postMessage(output);
} catch (e) {
process.exit(1);
}
parentPort?.postMessage(output);
});
}
22 changes: 19 additions & 3 deletions packages/terser/test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ test.serial('minify via terser options', async (t) => {
});

test.serial('minify multiple outputs', async (t) => {
let plugin;

const bundle = await rollup({
input: 'test/fixtures/unminified.js',
plugins: [terser()]
plugins: [(plugin = terser({ maxWorkers: 2 }))]
});

const [bundle1, bundle2] = await Promise.all([
Expand All @@ -60,6 +62,20 @@ test.serial('minify multiple outputs', async (t) => {

t.is(output1.code, '"use strict";window.a=5,window.a<3&&console.log(4);\n');
t.is(output2.code, 'window.a=5,window.a<3&&console.log(4);\n');
t.is(plugin.numOfWorkersUsed, 2, 'used 2 workers');
});

test.serial('minify multiple outputs with only 1 worker', async (t) => {
let plugin;

const bundle = await rollup({
input: 'test/fixtures/unminified.js',
plugins: [(plugin = terser({ maxWorkers: 1 }))]
});

await Promise.all([bundle.generate({ format: 'cjs' }), bundle.generate({ format: 'es' })]);

t.is(plugin.numOfWorkersUsed, 1, 'used 1 worker');
});

test.serial('minify esm module', async (t) => {
Expand Down Expand Up @@ -122,7 +138,7 @@ test.serial('throw error on terser fail', async (t) => {
await bundle.generate({ format: 'esm' });
t.falsy(true);
} catch (error) {
t.is(error.toString(), 'Error: Minify worker stopped with exit code 1');
t.is(error.toString(), 'SyntaxError: Name expected');
}
});

Expand All @@ -142,7 +158,7 @@ test.serial('throw error on terser fail with multiple outputs', async (t) => {
await Promise.all([bundle.generate({ format: 'cjs' }), bundle.generate({ format: 'esm' })]);
t.falsy(true);
} catch (error) {
t.is(error.toString(), 'Error: Minify worker stopped with exit code 1');
t.is(error.toString(), 'SyntaxError: Name expected');
}
});

Expand Down

0 comments on commit 74dbb42

Please sign in to comment.