Skip to content

Commit

Permalink
Await watchChange and closeWatcher hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
lukastaegert committed Mar 6, 2022
1 parent b5b74aa commit 1eae026
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 43 deletions.
6 changes: 3 additions & 3 deletions cli/run/watch-cli.ts
Expand Up @@ -56,7 +56,7 @@ export async function watch(command: Record<string, any>): Promise<void> {
return;
}
if (watcher) {
watcher.close();
await watcher.close();
}
start(options, warnings);
} catch (err: any) {
Expand Down Expand Up @@ -136,12 +136,12 @@ export async function watch(command: Record<string, any>): Promise<void> {
});
}

function close(code: number | null): void {
async function close(code: number | null): Promise<void> {
process.removeListener('uncaughtException', close);
// removing a non-existent listener is a no-op
process.stdin.removeListener('end', close);

if (watcher) watcher.close();
if (watcher) await watcher.close();
if (configWatcher) configWatcher.close();

if (code) {
Expand Down
6 changes: 3 additions & 3 deletions docs/05-plugin-development.md
Expand Up @@ -96,7 +96,7 @@ Called on each `rollup.rollup` build. This is the recommended hook to use when y

#### `closeWatcher`

**Type:** `() => void`<br> **Kind:** `sync, sequential`<br> **Previous/Next Hook:** This hook can be triggered at any time both during the build and the output generation phases. If that is the case, the current build will still proceed but no new [`watchChange`](guide/en/#watchchange) events will be triggered ever.
**Type:** `() => void`<br> **Kind:** `async, parallel`<br> **Previous/Next Hook:** This hook can be triggered at any time both during the build and the output generation phases. If that is the case, the current build will still proceed but no new [`watchChange`](guide/en/#watchchange) events will be triggered ever.

Notifies a plugin when watcher process closes and all open resources should be closed too. This hook cannot be used by output plugins.

Expand Down Expand Up @@ -302,9 +302,9 @@ You can use [`this.getModuleInfo`](guide/en/#thisgetmoduleinfo) to find out the

#### `watchChange`

**Type:** `watchChange: (id: string, change: {event: 'create' | 'update' | 'delete'}) => void`<br> **Kind:** `sync, sequential`<br> **Previous/Next Hook:** This hook can be triggered at any time both during the build and the output generation phases. If that is the case, the current build will still proceed but a new build will be scheduled to start once the current build has completed, starting again with [`options`](guide/en/#options).
**Type:** `watchChange: (id: string, change: {event: 'create' | 'update' | 'delete'}) => void`<br> **Kind:** `async, parallel`<br> **Previous/Next Hook:** This hook can be triggered at any time both during the build and the output generation phases. If that is the case, the current build will still proceed but a new build will be scheduled to start once the current build has completed, starting again with [`options`](guide/en/#options).

Notifies a plugin whenever rollup has detected a change to a monitored file in `--watch` mode. This hook cannot be used by output plugins. Second argument contains additional details of change event.
Notifies a plugin whenever rollup has detected a change to a monitored file in `--watch` mode. If a promise is returned, Rollup will wait for the promise to resolve before scheduling another build. This hook cannot be used by output plugins. The second argument contains additional details of change event.

### Output Generation Hooks

Expand Down
4 changes: 2 additions & 2 deletions docs/build-hooks.mmd
Expand Up @@ -32,10 +32,10 @@ flowchart TB
transform("transform"):::hook-sequential
click transform "/guide/en/#transform" _parent

watchchange("watchChange"):::hook-sequential-sync
watchchange("watchChange"):::hook-parallel
click watchchange "/guide/en/#watchchange" _parent

closewatcher("closeWatcher"):::hook-sequential-sync
closewatcher("closeWatcher"):::hook-parallel
click closewatcher "/guide/en/#closewatcher" _parent

options
Expand Down
14 changes: 5 additions & 9 deletions src/Graph.ts
Expand Up @@ -78,15 +78,11 @@ export default class Graph {

if (watcher) {
this.watchMode = true;
const handleChange: WatchChangeHook = (...args) =>
this.pluginDriver.hookSeqSync('watchChange', args);
const handleClose = () => this.pluginDriver.hookSeqSync('closeWatcher', []);
watcher.on('change', handleChange);
watcher.on('close', handleClose);
watcher.once('restart', () => {
watcher.removeListener('change', handleChange);
watcher.removeListener('close', handleClose);
});
const handleChange = (...args: Parameters<WatchChangeHook>) =>
this.pluginDriver.hookParallel('watchChange', args);
const handleClose = () => this.pluginDriver.hookParallel('closeWatcher', []);
watcher.onCurrentAwaited('change', handleChange);
watcher.onCurrentAwaited('close', handleClose);
}
this.pluginDriver = new PluginDriver(this, options, options.plugins, this.pluginCache);
this.acornParser = acorn.Parser.extend(...(options.acornInjectPlugins as any));
Expand Down
49 changes: 33 additions & 16 deletions src/rollup/types.d.ts
Expand Up @@ -344,7 +344,7 @@ export type WatchChangeHook = (
this: PluginContext,
id: string,
change: { event: ChangeEvent }
) => void;
) => Promise<void> | void;

/**
* use this type for plugin annotation
Expand Down Expand Up @@ -375,7 +375,7 @@ export interface PluginHooks extends OutputPluginHooks {
buildEnd: (this: PluginContext, err?: Error) => Promise<void> | void;
buildStart: (this: PluginContext, options: NormalizedInputOptions) => Promise<void> | void;
closeBundle: (this: PluginContext) => Promise<void> | void;
closeWatcher: (this: PluginContext) => void;
closeWatcher: (this: PluginContext) => Promise<void> | void;
load: LoadHook;
moduleParsed: ModuleParsedHook;
options: (
Expand Down Expand Up @@ -440,7 +440,9 @@ export type AsyncPluginHooks =
| 'shouldTransformCachedModule'
| 'transform'
| 'writeBundle'
| 'closeBundle';
| 'closeBundle'
| 'closeWatcher'
| 'watchChange';

export type PluginValueHooks = 'banner' | 'footer' | 'intro' | 'outro';

Expand All @@ -458,13 +460,11 @@ export type FirstPluginHooks =

export type SequentialPluginHooks =
| 'augmentChunkHash'
| 'closeWatcher'
| 'generateBundle'
| 'options'
| 'outputOptions'
| 'renderChunk'
| 'transform'
| 'watchChange';
| 'transform';

export type ParallelPluginHooks =
| 'banner'
Expand All @@ -477,7 +477,9 @@ export type ParallelPluginHooks =
| 'renderError'
| 'renderStart'
| 'writeBundle'
| 'closeBundle';
| 'closeBundle'
| 'closeWatcher'
| 'watchChange';

interface OutputPluginValueHooks {
banner: AddonHook;
Expand Down Expand Up @@ -898,6 +900,24 @@ interface TypedEventEmitter<T extends { [event: string]: (...args: any) => any }
setMaxListeners(n: number): this;
}

export interface RollupAwaitingEmitter<T extends { [event: string]: (...args: any) => any }>
extends TypedEventEmitter<T> {
close(): Promise<void>;
emitAndAwait<K extends keyof T>(event: K, ...args: Parameters<T[K]>): Promise<ReturnType<T[K]>[]>;
/**
* Registers an event listener that will be awaited before Rollup continues
* for events emitted via emitAndAwait. All listeners will be awaited in
* parallel while rejections are tracked via Promise.all.
* Listeners are removed automatically when removeAwaited is called, which
* happens automatically after each run.
*/
onCurrentAwaited<K extends keyof T>(
event: K,
listener: (...args: Parameters<T[K]>) => Promise<ReturnType<T[K]>>
): this;
removeAwaited(): this;
}

export type RollupWatcherEvent =
| { code: 'START' }
| { code: 'BUNDLE_START'; input?: InputOption; output: readonly string[] }
Expand All @@ -911,15 +931,12 @@ export type RollupWatcherEvent =
| { code: 'END' }
| { code: 'ERROR'; error: RollupError; result: RollupBuild | null };

export interface RollupWatcher
extends TypedEventEmitter<{
change: (id: string, change: { event: ChangeEvent }) => void;
close: () => void;
event: (event: RollupWatcherEvent) => void;
restart: () => void;
}> {
close(): void;
}
export type RollupWatcher = RollupAwaitingEmitter<{
change: (id: string, change: { event: ChangeEvent }) => void;
close: () => void;
event: (event: RollupWatcherEvent) => void;
restart: () => void;
}>;

export function watch(config: RollupWatchOptions | RollupWatchOptions[]): RollupWatcher;

Expand Down
36 changes: 34 additions & 2 deletions src/watch/watch-proxy.ts
Expand Up @@ -5,15 +5,47 @@ import { errInvalidOption, error } from '../utils/error';
import type { GenericConfigObject } from '../utils/options/options';
import { loadFsEvents } from './fsevents-importer';

class WatchEmitter extends EventEmitter {
class WatchEmitter<T extends { [event: string]: (...args: any) => any }> extends EventEmitter {
private awaitedHandlers: {
[K in keyof T]?: ((...args: Parameters<T[K]>) => Promise<ReturnType<T[K]>>)[];
} = {};

constructor() {
super();
// Allows more than 10 bundles to be watched without
// showing the `MaxListenersExceededWarning` to the user.
this.setMaxListeners(Infinity);
}

close() {}
// Will be overwritten by Rollup
async close(): Promise<void> {}

emitAndAwait<K extends keyof T>(
event: K,
...args: Parameters<T[K]>
): Promise<ReturnType<T[K]>[]> {
this.emit(event as string, ...(args as any[]));
const handlers = this.awaitedHandlers[event];
if (!handlers) return Promise.resolve([]);
return Promise.all(handlers.map(handler => handler(...args)));
}

onCurrentAwaited<K extends keyof T>(
event: K,
listener: (...args: Parameters<T[K]>) => Promise<ReturnType<T[K]>>
): this {
let handlers = this.awaitedHandlers[event];
if (!handlers) {
handlers = this.awaitedHandlers[event] = [];
}
handlers.push(listener);
return this;
}

removeAwaited(): this {
this.awaitedHandlers = {};
return this;
}
}

export default function watch(configs: GenericConfigObject[] | GenericConfigObject): RollupWatcher {
Expand Down
15 changes: 9 additions & 6 deletions src/watch/watch.ts
Expand Up @@ -57,12 +57,12 @@ export class Watcher {
process.nextTick(() => this.run());
}

close(): void {
async close(): Promise<void> {
if (this.buildTimeout) clearTimeout(this.buildTimeout);
for (const task of this.tasks) {
task.close();
}
this.emitter.emit('close');
await this.emitter.emitAndAwait('close');
this.emitter.removeAllListeners();
}

Expand All @@ -87,14 +87,17 @@ export class Watcher {

if (this.buildTimeout) clearTimeout(this.buildTimeout);

this.buildTimeout = setTimeout(() => {
this.buildTimeout = setTimeout(async () => {
this.buildTimeout = null;
try {
for (const [id, event] of this.invalidatedIds) {
this.emitter.emit('change', id, { event });
}
await Promise.all(
[...this.invalidatedIds].map(([id, event]) =>
this.emitter.emitAndAwait('change', id, { event })
)
);
this.invalidatedIds.clear();
this.emitter.emit('restart');
this.emitter.removeAwaited();
this.run();
} catch (error: any) {
this.invalidatedIds.clear();
Expand Down
5 changes: 3 additions & 2 deletions test/watch/index.js
Expand Up @@ -604,13 +604,14 @@ describe('rollup.watch', () => {
]);
});

it('recovers from a plugin error in the watchChange hook', async () => {
it('awaits and recovers from a plugin error in the watchChange hook', async () => {
let fail = true;
await copy('test/watch/samples/basic', 'test/_tmp/input');
watcher = rollup.watch({
input: 'test/_tmp/input/main.js',
plugins: {
watchChange(id) {
async watchChange() {
await new Promise(resolve => setTimeout(resolve, 300));
if (fail) {
this.error('Failed in watchChange');
}
Expand Down

0 comments on commit 1eae026

Please sign in to comment.