Skip to content

Commit

Permalink
feat(core): add daemon watcher filters (#13229)
Browse files Browse the repository at this point in the history
  • Loading branch information
Cammisuli committed Nov 19, 2022
1 parent 3af70fc commit 3f2fa6c
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 49 deletions.
4 changes: 2 additions & 2 deletions packages/nx/src/daemon/client/client.ts
Expand Up @@ -107,7 +107,7 @@ export class DaemonClient {
async registerFileWatcher(
config: {
watchProjects: string[] | 'all';
watchGlobalWorkspaceFiles: boolean;
includeGlobalWorkspaceFiles: boolean;
},
callback: (
error: Error | null | 'closed',
Expand All @@ -117,7 +117,7 @@ export class DaemonClient {
} | null
) => void
): Promise<UnregisterCallback> {
await this.sendToDaemonViaQueue({ type: 'PING' });
await this.getProjectGraph();
const messenger = new SocketMessenger(connect(FULL_OS_SOCKET_PATH)).listen(
(message) => {
try {
Expand Down
65 changes: 65 additions & 0 deletions packages/nx/src/daemon/server/file-watching/changed-projects.ts
@@ -0,0 +1,65 @@
import { performance } from 'perf_hooks';
import { projectFileMapWithFiles } from '../project-graph-incremental-recomputation';

export type ChangedFile = {
path: string;
type: 'CREATED' | 'UPDATED' | 'DELETED';
};

export function getProjectsAndGlobalChanges(
createdFiles: string[] | null,
updatedFiles: string[] | null,
deletedFiles: string[] | null
) {
const projectAndGlobalChanges: {
projects: { [changedProject: string]: ChangedFile[] };
globalFiles: ChangedFile[];
} = {
projects: {},
globalFiles: [],
};

performance.mark('changed-projects:start');

const allChangedFiles: ChangedFile[] = [
...(createdFiles ?? []).map<ChangedFile>((c) => ({
path: c,
type: 'CREATED',
})),
...(updatedFiles ?? []).map<ChangedFile>((c) => ({
path: c,
type: 'UPDATED',
})),
...(deletedFiles ?? []).map<ChangedFile>((c) => ({
path: c,
type: 'DELETED',
})),
];

const fileToProjectMap: Record<string, string> = {};
for (const [projectName, projectFiles] of Object.entries(
projectFileMapWithFiles?.projectFileMap ?? {}
)) {
for (const projectFile of projectFiles) {
fileToProjectMap[projectFile.file] = projectName;
}
}

for (const changedFile of allChangedFiles) {
const project = fileToProjectMap[changedFile.path];
if (project) {
(projectAndGlobalChanges.projects[project] ??= []).push(changedFile);
} else {
projectAndGlobalChanges.globalFiles.push(changedFile);
}
}

performance.mark('changed-projects:end');
performance.measure(
'changed-projects',
'changed-projects:start',
'changed-projects:end'
);

return projectAndGlobalChanges;
}
@@ -0,0 +1,82 @@
import { Socket } from 'net';
import { ProjectGraphCache } from '../../../project-graph/nx-deps-cache';
import { PromisedBasedQueue } from '../../../utils/promised-based-queue';
import { handleResult } from '../server';
import { getProjectsAndGlobalChanges } from './changed-projects';

const queue = new PromisedBasedQueue();

export let registeredFileWatcherSockets: {
socket: Socket;
config: {
watchProjects: string[] | 'all';
includeGlobalWorkspaceFiles: boolean;
};
}[] = [];

export function removeRegisteredFileWatcherSocket(socket: Socket) {
registeredFileWatcherSockets = registeredFileWatcherSockets.filter(
(watcher) => watcher.socket !== socket
);
}

export function hasRegisteredFileWatcherSockets() {
return registeredFileWatcherSockets.length > 0;
}

export function notifyFileWatcherSockets(
createdFiles: string[] | null,
updatedFiles: string[],
deletedFiles: string[]
) {
if (!hasRegisteredFileWatcherSockets()) {
return;
}

queue.sendToQueue(async () => {
const projectAndGlobalChanges = getProjectsAndGlobalChanges(
createdFiles,
updatedFiles,
deletedFiles
);

await Promise.all(
registeredFileWatcherSockets.map(({ socket, config }) => {
const changedProjects = [];
const changedFiles = [];
if (config.watchProjects === 'all') {
for (const [projectName, projectFiles] of Object.entries(
projectAndGlobalChanges.projects
)) {
changedProjects.push(projectName);
changedFiles.push(...projectFiles);
}
} else {
for (const watchedProject of config.watchProjects) {
if (!!projectAndGlobalChanges.projects[watchedProject]) {
changedProjects.push(watchedProject);

changedFiles.push(
...projectAndGlobalChanges.projects[watchedProject]
);
}
}
}

if (config.includeGlobalWorkspaceFiles) {
changedFiles.push(...projectAndGlobalChanges.globalFiles);
}

if (changedProjects.length > 0 || changedFiles.length > 0) {
return handleResult(socket, {
description: 'File watch changed',
response: JSON.stringify({
changedProjects,
changedFiles,
}),
});
}
})
);
});
}
@@ -1,26 +1,27 @@
import { performance } from 'perf_hooks';
import { readAllWorkspaceConfiguration } from '../../config/configuration';
import { FileData, ProjectFileMap } from '../../config/project-graph';
import { defaultFileHasher } from '../../hasher/file-hasher';
import { serverLogger } from './logger';
import { HashingImpl } from '../../hasher/hashing-impl';
import { buildProjectGraphUsingProjectFileMap } from '../../project-graph/build-project-graph';
import {
createProjectFileMap,
updateProjectFileMap,
} from '../../project-graph/file-map-utils';
import {
nxDepsPath,
ProjectGraphCache,
readCache,
} from '../../project-graph/nx-deps-cache';
import { fileExists } from '../../utils/fileutils';
import { HashingImpl } from '../../hasher/hashing-impl';
import {
createProjectFileMap,
updateProjectFileMap,
} from '../../project-graph/file-map-utils';
import { FileData, ProjectFileMap } from '../../config/project-graph';
import { notifyFileWatcherSockets } from './file-watching/file-watcher-sockets';
import { serverLogger } from './logger';

let cachedSerializedProjectGraphPromise: Promise<{
error: Error | null;
serializedProjectGraph: string | null;
}>;
let projectFileMapWithFiles:
export let projectFileMapWithFiles:
| { projectFileMap: ProjectFileMap; allWorkspaceFiles: FileData[] }
| undefined;
let currentProjectGraphCache: ProjectGraphCache | undefined;
Expand Down Expand Up @@ -58,10 +59,11 @@ export async function getCachedSerializedProjectGraphPromise() {
}

export function addUpdatedAndDeletedFiles(
createdFiles: string[],
updatedFiles: string[],
deletedFiles: string[]
) {
for (let f of updatedFiles) {
for (let f of [...createdFiles, ...updatedFiles]) {
collectedDeletedFiles.delete(f);
collectedUpdatedFiles.add(f);
}
Expand All @@ -71,14 +73,28 @@ export function addUpdatedAndDeletedFiles(
collectedDeletedFiles.add(f);
}

if (updatedFiles.length > 0 || deletedFiles.length > 0) {
notifyFileWatcherSockets(null, updatedFiles, deletedFiles);
}

if (createdFiles.length > 0) {
waitPeriod = 100; // reset it to process the graph faster
}

if (!scheduledTimeoutId) {
scheduledTimeoutId = setTimeout(() => {
scheduledTimeoutId = setTimeout(async () => {
scheduledTimeoutId = undefined;
if (waitPeriod < 4000) {
waitPeriod = waitPeriod * 2;
}

cachedSerializedProjectGraphPromise =
processFilesAndCreateAndSerializeProjectGraph();
await cachedSerializedProjectGraphPromise;

if (createdFiles.length > 0) {
notifyFileWatcherSockets(createdFiles, null, null);
}
}, waitPeriod);
}
}
Expand Down
57 changes: 20 additions & 37 deletions packages/nx/src/daemon/server/server.ts
Expand Up @@ -41,6 +41,10 @@ import {
processFileChangesInOutputs,
} from './outputs-tracking';
import { handleRequestShutdown } from './handle-request-shutdown';
import {
registeredFileWatcherSockets,
removeRegisteredFileWatcherSocket,
} from './file-watching/file-watcher-sockets';

let performanceObserver: PerformanceObserver | undefined;
let workspaceWatcherError: Error | undefined;
Expand All @@ -54,8 +58,6 @@ export type HandlerResult = {

let numberOfOpenConnections = 0;

let registeredFileWatcherSockets: { socket: Socket; filter: any }[] = [];

const server = createServer(async (socket) => {
numberOfOpenConnections += 1;
serverLogger.log(
Expand Down Expand Up @@ -88,9 +90,7 @@ const server = createServer(async (socket) => {
`Closed a connection. Number of open connections: ${numberOfOpenConnections}`
);

registeredFileWatcherSockets = registeredFileWatcherSockets.filter(
(watcher) => watcher.socket !== socket
);
removeRegisteredFileWatcherSocket(socket);
});
});

Expand Down Expand Up @@ -143,7 +143,7 @@ async function handleMessage(socket, data: string) {
await handleRequestShutdown(server, numberOfOpenConnections)
);
} else if (payload.type === 'REGISTER_FILE_WATCHER') {
registeredFileWatcherSockets.push({ socket, filter: payload.config });
registeredFileWatcherSockets.push({ socket, config: payload.config });
} else {
await respondWithErrorAndExit(
socket,
Expand All @@ -153,7 +153,7 @@ async function handleMessage(socket, data: string) {
}
}

async function handleResult(socket: Socket, hr: HandlerResult) {
export async function handleResult(socket: Socket, hr: HandlerResult) {
if (hr.error) {
await respondWithErrorAndExit(socket, hr.description, hr.error);
} else {
Expand Down Expand Up @@ -255,35 +255,34 @@ const handleWorkspaceChanges: FileWatcherCallback = async (

serverLogger.watcherLog(convertChangeEventsToLogMessage(changeEvents));

const filesToHash = [];
const updatedFilesToHash = [];
const createdFilesToHash = [];
const deletedFiles = [];
const changedFiles = [];

for (const event of changeEvents) {
if (event.type === 'delete') {
deletedFiles.push(event.path);
changedFiles.push({
path: event.path,
type: 'DELETE',
});
} else {
try {
const s = statSync(join(workspaceRoot, event.path));
if (s.isFile()) {
filesToHash.push(event.path);
changedFiles.push({
path: event.path,
type: event.type.toUpperCase(),
});
if (event.type === 'update') {
updatedFilesToHash.push(event.path);
} else {
createdFilesToHash.push(event.path);
}
}
} catch (e) {
// this can happen when the update file was deleted right after
}
}
}

await notifyFileWatcherSockets(changedFiles);

addUpdatedAndDeletedFiles(filesToHash, deletedFiles);
addUpdatedAndDeletedFiles(
createdFilesToHash,
updatedFilesToHash,
deletedFiles
);
} catch (err) {
serverLogger.watcherLog(`Unexpected workspace error`, err.message);
console.error(err);
Expand Down Expand Up @@ -377,19 +376,3 @@ export async function stopServer(): Promise<void> {
});
});
}

async function notifyFileWatcherSockets(
changedFiles: { path: string; type: 'CREATE' | 'UPDATE' | 'DELETE' }[]
) {
await Promise.all(
registeredFileWatcherSockets.map(({ socket, filter }) =>
handleResult(socket, {
description: 'File watch changed',
response: JSON.stringify({
changedProjects: [],
changedFiles,
}),
})
)
);
}

1 comment on commit 3f2fa6c

@vercel
Copy link

@vercel vercel bot commented on 3f2fa6c Nov 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

nx-dev – ./

nx-dev-nrwl.vercel.app
nx-dev-git-master-nrwl.vercel.app
nx-five.vercel.app
nx.dev

Please sign in to comment.