Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit parallel file reads to prevent "EMFILE: too many open files" error #4170

Merged
merged 7 commits into from Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
494 changes: 494 additions & 0 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Expand Up @@ -103,6 +103,7 @@
"pretty-ms": "^7.0.1",
"require-relative": "^0.8.7",
"requirejs": "^2.3.6",
"rewire": "^5.0.0",
schummar marked this conversation as resolved.
Show resolved Hide resolved
"rollup": "^2.52.0",
"rollup-plugin-license": "^2.5.0",
"rollup-plugin-string": "^3.0.0",
Expand Down
2 changes: 2 additions & 0 deletions src/Graph.ts
Expand Up @@ -51,6 +51,8 @@ export default class Graph {
moduleLoader: ModuleLoader;
modulesById = new Map<string, Module | ExternalModule>();
needsTreeshakingPass = false;
parallelFileReads = 0;
parallelFileReadsMax = 0;
phase: BuildPhase = BuildPhase.LOAD_AND_PARSE;
pluginDriver: PluginDriver;
scope: GlobalScope;
Expand Down
14 changes: 13 additions & 1 deletion src/ModuleLoader.ts
Expand Up @@ -30,6 +30,7 @@ import {
} from './utils/error';
import { readFile } from './utils/fs';
import { isAbsolute, isRelative, resolve } from './utils/path';
import { Queue } from './utils/queue';
import relativeId from './utils/relativeId';
import { resolveId } from './utils/resolveId';
import { timeEnd, timeStart } from './utils/timers';
Expand All @@ -53,6 +54,7 @@ export class ModuleLoader {
private readonly indexedEntryModules: { index: number; module: Module }[] = [];
private latestLoadModulesPromise: Promise<unknown> = Promise.resolve();
private nextEntryModuleIndex = 0;
private readQueue = new Queue();

constructor(
private readonly graph: Graph,
Expand All @@ -63,6 +65,7 @@ export class ModuleLoader {
this.hasModuleSideEffects = options.treeshake
? options.treeshake.moduleSideEffects
: () => true;
this.readQueue.maxParallel = options.maxParallelFileReads;
}

async addAdditionalModules(unresolvedModules: string[]): Promise<Module[]> {
Expand Down Expand Up @@ -217,7 +220,16 @@ export class ModuleLoader {
timeStart('load modules', 3);
let source: string | SourceDescription;
try {
source = (await this.pluginDriver.hookFirst('load', [id])) ?? (await readFile(id));
source =
(await this.pluginDriver.hookFirst('load', [id])) ??
(await this.readQueue.run(async () => {
this.graph.parallelFileReads++;
schummar marked this conversation as resolved.
Show resolved Hide resolved
this.graph.parallelFileReadsMax = Math.max(
this.graph.parallelFileReadsMax,
this.graph.parallelFileReads
);
return readFile(id).finally(() => this.graph.parallelFileReads--);
}));
} catch (err) {
timeEnd('load modules', 3);
let msg = `Could not load ${id}`;
Expand Down
1 change: 1 addition & 0 deletions src/rollup/rollup.ts
Expand Up @@ -86,6 +86,7 @@ export async function rollupInternal(
graph
);
},
maxParallelFileReads: graph.parallelFileReadsMax,
schummar marked this conversation as resolved.
Show resolved Hide resolved
watchFiles: Object.keys(graph.watchFiles),
async write(rawOutputOptions: OutputOptions) {
if (result.closed) return error(errAlreadyClosed());
Expand Down
3 changes: 3 additions & 0 deletions src/rollup/types.d.ts
Expand Up @@ -537,6 +537,7 @@ export interface InputOptions {
makeAbsoluteExternalsRelative?: boolean | 'ifRelativeSource';
/** @deprecated Use the "manualChunks" output option instead. */
manualChunks?: ManualChunksOption;
maxParallelFileReads?: number;
moduleContext?: ((id: string) => string | null | undefined) | { [id: string]: string };
onwarn?: WarningHandlerWithDefault;
perf?: boolean;
Expand Down Expand Up @@ -564,6 +565,7 @@ export interface NormalizedInputOptions {
makeAbsoluteExternalsRelative: boolean | 'ifRelativeSource';
/** @deprecated Use the "manualChunks" output option instead. */
manualChunks: ManualChunksOption | undefined;
maxParallelFileReads: number;
moduleContext: (id: string) => string;
onwarn: WarningHandler;
perf: boolean;
Expand Down Expand Up @@ -789,6 +791,7 @@ export interface RollupBuild {
closed: boolean;
generate: (outputOptions: OutputOptions) => Promise<RollupOutput>;
getTimings?: () => SerializedTimings;
maxParallelFileReads: number;
watchFiles: string[];
write: (options: OutputOptions) => Promise<RollupOutput>;
}
Expand Down
1 change: 1 addition & 0 deletions src/utils/options/mergeOptions.ts
Expand Up @@ -120,6 +120,7 @@ function mergeInputOptions(
input: getOption('input') || [],
makeAbsoluteExternalsRelative: getOption('makeAbsoluteExternalsRelative'),
manualChunks: getOption('manualChunks'),
maxParallelFileReads: getOption('maxParallelFileReads'),
moduleContext: getOption('moduleContext'),
onwarn: getOnWarn(config, defaultOnWarnHandler),
perf: getOption('perf'),
Expand Down
15 changes: 15 additions & 0 deletions src/utils/options/normalizeInputOptions.ts
Expand Up @@ -50,6 +50,7 @@ export function normalizeInputOptions(config: InputOptions): {
input: getInput(config),
makeAbsoluteExternalsRelative: config.makeAbsoluteExternalsRelative ?? true,
manualChunks: getManualChunks(config, onwarn, strictDeprecations),
maxParallelFileReads: getMaxParallelFileReads(config),
moduleContext: getModuleContext(config, context),
onwarn,
perf: config.perf || false,
Expand Down Expand Up @@ -175,6 +176,20 @@ const getManualChunks = (
return configManualChunks;
};

const getMaxParallelFileReads = (
config: InputOptions
): NormalizedInputOptions['maxParallelFileReads'] => {
const maxParallelFileReads = config.maxParallelFileReads as unknown;
if (typeof maxParallelFileReads === 'number') {
if (maxParallelFileReads <= 0) return Infinity;
return maxParallelFileReads;
}
if (typeof maxParallelFileReads === 'string' && maxParallelFileReads.match(/^inf(inity)?$/i)) {
schummar marked this conversation as resolved.
Show resolved Hide resolved
return Infinity;
}
return 20;
};

const getModuleContext = (
config: InputOptions,
context: string
Expand Down
36 changes: 36 additions & 0 deletions src/utils/queue.ts
@@ -0,0 +1,36 @@
export class Queue {
private queue = new Array<{
reject: (reason?: any) => void;
resolve: (value: any) => void;
task: () => any;
}>();
private workerCount = 0;

constructor(public maxParallel = 1) {}

run<T>(task: () => T | Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
this.queue.push({ reject, resolve, task });
this.work();
});
}

private async work() {
if (this.workerCount >= this.maxParallel) return;
this.workerCount++;

let entry;
while ((entry = this.queue.shift())) {
const { reject, resolve, task } = entry;

try {
const result = await task();
resolve(result);
} catch (err) {
reject(err);
}
}

this.workerCount--;
}
}
@@ -0,0 +1 @@
export const x1 = 1;
@@ -0,0 +1 @@
export const x2 = 2;
@@ -0,0 +1 @@
export const x3 = 3;
@@ -0,0 +1 @@
export const x4 = 4;
@@ -0,0 +1 @@
export const x5 = 5;
@@ -0,0 +1,16 @@
const assert = require('assert');
schummar marked this conversation as resolved.
Show resolved Hide resolved

module.exports = {
description: 'limit parallel file reads',
options: {
maxParallelFileReads: 0
},
bundle(bundle) {
const maxParallelFileReads = bundle.maxParallelFileReads;
assert.strictEqual(
maxParallelFileReads,
5,
'Wrong number of parallel file reads: ' + maxParallelFileReads
);
}
};
@@ -0,0 +1,5 @@
export * from './1';
export * from './2';
export * from './3';
export * from './4';
export * from './5';
@@ -0,0 +1 @@
export const x1 = 1;
@@ -0,0 +1 @@
export const x2 = 2;
@@ -0,0 +1 @@
export const x3 = 3;
@@ -0,0 +1 @@
export const x4 = 4;
@@ -0,0 +1 @@
export const x5 = 5;
16 changes: 16 additions & 0 deletions test/function/samples/max-parallel-file-reads-unlimited/_config.js
@@ -0,0 +1,16 @@
const assert = require('assert');

module.exports = {
description: 'limit parallel file reads',
options: {
maxParallelFileReads: 'inf'
},
bundle(bundle) {
const maxParallelFileReads = bundle.maxParallelFileReads;
assert.strictEqual(
maxParallelFileReads,
5,
'Wrong number of parallel file reads: ' + maxParallelFileReads
);
}
};
@@ -0,0 +1,5 @@
export * from './1';
export * from './2';
export * from './3';
export * from './4';
export * from './5';
1 change: 1 addition & 0 deletions test/function/samples/max-parallel-file-reads/1.js
@@ -0,0 +1 @@
export const x1 = 1;
1 change: 1 addition & 0 deletions test/function/samples/max-parallel-file-reads/2.js
@@ -0,0 +1 @@
export const x2 = 2;
1 change: 1 addition & 0 deletions test/function/samples/max-parallel-file-reads/3.js
@@ -0,0 +1 @@
export const x3 = 3;
1 change: 1 addition & 0 deletions test/function/samples/max-parallel-file-reads/4.js
@@ -0,0 +1 @@
export const x4 = 4;
1 change: 1 addition & 0 deletions test/function/samples/max-parallel-file-reads/5.js
@@ -0,0 +1 @@
export const x5 = 5;
16 changes: 16 additions & 0 deletions test/function/samples/max-parallel-file-reads/_config.js
@@ -0,0 +1,16 @@
const assert = require('assert');

module.exports = {
description: 'unlimited parallel file reads',
options: {
maxParallelFileReads: 3
},
bundle(bundle) {
const maxParallelFileReads = bundle.maxParallelFileReads;
assert.strictEqual(
maxParallelFileReads,
3,
'Wrong number of parallel file reads: ' + maxParallelFileReads
);
}
};
5 changes: 5 additions & 0 deletions test/function/samples/max-parallel-file-reads/main.js
@@ -0,0 +1,5 @@
export * from './1';
export * from './2';
export * from './3';
export * from './4';
export * from './5';
1 change: 1 addition & 0 deletions test/function/samples/options-hook/_config.js
Expand Up @@ -20,6 +20,7 @@ module.exports = {
experimentalCacheExpiry: 10,
input: ['used'],
makeAbsoluteExternalsRelative: true,
maxParallelFileReads: 20,
perf: false,
plugins: [
{
Expand Down
1 change: 1 addition & 0 deletions test/misc/index.js
Expand Up @@ -4,5 +4,6 @@ require('./deprecations');
require('./iife');
require('./in-memory-sourcemaps');
require('./misc');
require('./queue');
require('./sanity-checks');
require('./umd');
4 changes: 2 additions & 2 deletions test/misc/optionList.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions test/misc/queue.js
@@ -0,0 +1,33 @@
const assert = require('assert');
const rewire = require('rewire');
const { assertIncludes, loader } = require('../utils.js');
const rollup = rewire('../../dist/shared/rollup');
schummar marked this conversation as resolved.
Show resolved Hide resolved

const Queue = rollup.__get__('Queue');

describe('queue', () => {
it('max parallel execution', async () => {
let concurrency = 0,
maxConcurrency = 0;
const q = new Queue(5);
const promises = Array(10)
.fill(0)
.map(() =>
q.run(async () => {
concurrency++;
maxConcurrency = Math.max(concurrency, maxConcurrency);
await Promise.resolve();
concurrency--;
})
);

await Promise.all(promises);
assert.strictEqual(maxConcurrency, 5, 'maxConcurrency is not 5: ' + maxConcurrency);
});

it('forwards errors', () => {
const q = new Queue(5);
const promise = q.run(() => Promise.reject(42));
assert.rejects(promise, 'Should reject');
});
});