diff --git a/integration-tests/gatsby-cli/package.json b/integration-tests/gatsby-cli/package.json index 9abe5f310e7de..e73eb89c912ce 100644 --- a/integration-tests/gatsby-cli/package.json +++ b/integration-tests/gatsby-cli/package.json @@ -6,7 +6,7 @@ }, "license": "MIT", "scripts": { - "test": "jest" + "test": "jest -w 1" }, "devDependencies": { "babel-jest": "^24.0.0", diff --git a/packages/gatsby-worker/.babelrc b/packages/gatsby-worker/.babelrc new file mode 100644 index 0000000000000..ac0ad292bb087 --- /dev/null +++ b/packages/gatsby-worker/.babelrc @@ -0,0 +1,3 @@ +{ + "presets": [["babel-preset-gatsby-package"]] +} diff --git a/packages/gatsby-worker/.gitignore b/packages/gatsby-worker/.gitignore new file mode 100644 index 0000000000000..b2d59d1f7578b --- /dev/null +++ b/packages/gatsby-worker/.gitignore @@ -0,0 +1,2 @@ +/node_modules +/dist \ No newline at end of file diff --git a/packages/gatsby-worker/.npmignore b/packages/gatsby-worker/.npmignore new file mode 100644 index 0000000000000..d0a59d6ec3da3 --- /dev/null +++ b/packages/gatsby-worker/.npmignore @@ -0,0 +1,36 @@ +# Logs +logs +*.log + +# Runtime data +pids +*.pid +*.seed + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage + +# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (http://nodejs.org/api/addons.html) +build/Release + +# Dependency directory +# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git +node_modules +*.un~ +yarn.lock +src +flow-typed +coverage +decls +examples +.babelrc +tsconfig.json \ No newline at end of file diff --git a/packages/gatsby-worker/README.md b/packages/gatsby-worker/README.md new file mode 100644 index 0000000000000..aa8bb57c1b44f --- /dev/null +++ b/packages/gatsby-worker/README.md @@ -0,0 +1,125 @@ +# gatsby-worker + +Utility to execute tasks in forked processes. Highly inspired by [`jest-worker`](https://www.npmjs.com/package/jest-worker). + +## Example + +File `worker.ts`: + +```ts +export async function heavyTask(param: string): Promise { + // using workers is ideal for CPU intensive tasks + return await heavyProcessing(param) +} + +export async function setupStep(param: string): Promise { + await heavySetup(param) +} +``` + +File `parent.ts` + +```ts +import { WorkerPool } from "gatsby-worker" + +const workerPool = new WorkerPool( + require.resolve(`./worker`), + { + numWorkers: 5, + env: { + CUSTOM_ENV_VAR_TO_SET_IN_WORKER: `foo`, + }, + } +) + +// queue a task on all workers +const arrayOfPromises = workerPool.all.setupStep(`bar`) + +// queue a task on single worker +const singlePromise = workerPool.single.heavyTask(`baz`) +``` + +## API + +### Constructor + +```ts +// TypeOfWorkerModule allows to type exposed functions ensuring type safety. +// It will convert sync methods to async and discard/disallow usage of exports +// that are not functions. Recommended to use with ``. +const workerPool = new WorkerPool( + // Absolute path to worker module. Recommended to use with `require.resolve` + workerPath: string, + // Not required options + options?: { + // Number of workers to spawn. Defaults to `1` if not defined. + numWorkers?: number + // Additional env vars to set in worker. Worker will inherit env vars of parent process + // as well as additional `GATSBY_WORKER_ID` env var (starting with "1" for first worker) + env?: Record + } +) +``` + +### `.single` + +```ts +// Exports of the worker module become available under `.single` property of `WorkerPool` instance. +// Calling those will either start executing immediately if there are any idle workers or queue them +// to be executed once a worker become idle. +const singlePromise = workerPool.single.heavyTask(`baz`) +``` + +### `.all` + +```ts +// Exports of the worker module become available under `.all` property of `WorkerPool` instance. +// Calling those will ensure a function is executed on all workers. Best usage for this is performing +// setup/bootstrap of workers. +const arrayOfPromises = workerPool.all.setupStep(`baz`) +``` + +### `.end` + +```ts +// Used to shutdown `WorkerPool`. If there are any in progress or queued tasks, promises for those will be rejected as they won't be able to complete. +const arrayOfPromises = workerPool.end() +``` + +### `isWorker` + +```ts +// Determine if current context is executed in worker context. Useful for conditional handling depending on context. +import { isWorker } from "gatsby-worker" + +if (isWorker) { + // this is executed in worker context +} else { + // this is NOT executed in worker context +} +``` + +## Usage with unit tests + +If you are working with source files that need transpilation, you will need to make it possible to load untranspiled modules in child processes. +This can be done with `@babel/register` (or similar depending on your build toolchain). Example setup: + +```ts +const testWorkerPool = new WorkerPool(workerModule, { + numWorkers, + env: { + NODE_OPTIONS: `--require ${require.resolve(`./ts-register`)}`, + }, +}) +``` + +This will execute additional module before allowing adding runtime support for new JavaScript syntax or support for TypeScript. Example `ts-register.js`: + +```js +// spawned process won't use jest config (or other testing framework equivalent) to support TS, so we need to add support ourselves +require(`@babel/register`)({ + extensions: [`.js`, `.ts`], + configFile: require.resolve(relativePathToYourBabelConfig), + ignore: [/node_modules/], +}) +``` diff --git a/packages/gatsby-worker/package.json b/packages/gatsby-worker/package.json new file mode 100644 index 0000000000000..5d025f1734774 --- /dev/null +++ b/packages/gatsby-worker/package.json @@ -0,0 +1,41 @@ +{ + "name": "gatsby-worker", + "description": "Utility to create worker pools", + "version": "0.0.0-next.0", + "author": "Michal Piechowiak", + "bugs": { + "url": "https://github.com/gatsbyjs/gatsby/issues" + }, + "dependencies": { + "@babel/core": "^7.14.0" + }, + "devDependencies": { + "@babel/cli": "^7.14.0", + "@babel/register": "^7.14.0", + "babel-preset-gatsby-package": "^1.9.0-next.0", + "cross-env": "^7.0.3", + "rimraf": "^3.0.2", + "typescript": "^4.1.5" + }, + "homepage": "https://github.com/gatsbyjs/gatsby/tree/master/packages/gatsby-worker#readme", + "keywords": [ + "gatsby", + "worker" + ], + "license": "MIT", + "main": "dist/index.js", + "repository": { + "type": "git", + "url": "https://github.com/gatsbyjs/gatsby.git", + "directory": "packages/gatsby-worker" + }, + "scripts": { + "build": "babel src --out-dir dist/ --ignore \"**/__tests__\" --extensions \".ts,.js\"", + "prepare": "cross-env NODE_ENV=production npm run build && npm run typegen", + "watch": "babel -w src --out-dir dist/ --ignore \"**/__tests__\" --extensions \".ts,.js\"", + "typegen": "rimraf \"dist/**/*.d.ts\" && tsc --emitDeclarationOnly --declaration --declarationDir dist/" + }, + "engines": { + "node": ">=12.13.0" + } +} diff --git a/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts b/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts new file mode 100644 index 0000000000000..0b3c5219b04af --- /dev/null +++ b/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts @@ -0,0 +1,34 @@ +export function sync(a: string, opts?: { addWorkerId?: boolean }): string { + return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}` +} + +export async function async(a: string, opts?: { addWorkerId?: boolean }): Promise { + return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}` +} + +export function neverEnding(): Promise { + return new Promise(() => {}) +} + +export const notAFunction = `string` + +export function syncThrow(a: string, opts?: { addWorkerId?: boolean, throwOnWorker?: number }): string { + if (!opts?.throwOnWorker || opts?.throwOnWorker?.toString() === process.env.GATSBY_WORKER_ID) { + throw new Error(`sync throw${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}`) + } + + return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}` +} + +export async function asyncThrow(a: string, opts?: { addWorkerId?: boolean, throwOnWorker?: number }): Promise { + if (!opts?.throwOnWorker || opts?.throwOnWorker?.toString() === process.env.GATSBY_WORKER_ID) { + throw new Error(`async throw${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}`) + } + + return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}` +} + +// used in task queue as previous functions would be too often too fast +export async function async100ms(taskId: number, opts?: { addWorkerId?: boolean }): Promise<{taskId: number, workerId: string}> { + return new Promise(resolve => setTimeout(resolve, 100, {taskId, workerId: opts?.addWorkerId ? process.env.GATSBY_WORKER_ID : undefined})) +} \ No newline at end of file diff --git a/packages/gatsby-worker/src/__tests__/fixtures/ts-register.js b/packages/gatsby-worker/src/__tests__/fixtures/ts-register.js new file mode 100644 index 0000000000000..e3cc4efe14ad6 --- /dev/null +++ b/packages/gatsby-worker/src/__tests__/fixtures/ts-register.js @@ -0,0 +1,6 @@ +// spawned process won't use jest config to support TS, so we need to add support ourselves +require(`@babel/register`)({ + extensions: [`.js`, `.ts`], + configFile: require.resolve(`../../../.babelrc`), + ignore: [/node_modules/], +}) diff --git a/packages/gatsby-worker/src/__tests__/integration.ts b/packages/gatsby-worker/src/__tests__/integration.ts new file mode 100644 index 0000000000000..4cdf990f96cbf --- /dev/null +++ b/packages/gatsby-worker/src/__tests__/integration.ts @@ -0,0 +1,285 @@ +import "jest-extended" +import { WorkerPool } from "../" +import { isPromise } from "../utils" + +describe(`gatsby-worker`, () => { + let workerPool: WorkerPool | undefined + const numWorkers = 2 + + async function endWorkerPool(): Promise { + if (workerPool) { + await Promise.all(workerPool.end()) + workerPool = undefined + } + } + + beforeEach(() => { + workerPool = new WorkerPool( + require.resolve(`./fixtures/test-child`), + { + numWorkers, + env: { + NODE_OPTIONS: `--require ${require.resolve( + `./fixtures/ts-register` + )}`, + }, + } + ) + }) + + afterEach(endWorkerPool) + + it(`discovers exported functions`, () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + const exposedMethodsSingle = Object.keys(workerPool.single) + const exposedMethodsAll = Object.keys(workerPool.all) + // we expect that `notAFunction` even tho is exported in child module is not exposed + // as it's not a function + expect(exposedMethodsSingle).toMatchInlineSnapshot(` + Array [ + "sync", + "async", + "neverEnding", + "syncThrow", + "asyncThrow", + "async100ms", + ] + `) + // .all and .single should have same methods + expect(exposedMethodsSingle).toEqual(exposedMethodsAll) + }) + + describe(`.single`, () => { + it(`exported sync function`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + const returnValue = workerPool.single.sync(`.single sync`) + // turns sync function into async + expect(isPromise(returnValue)).toEqual(true) + + const resolvedValue = await returnValue + expect(resolvedValue).toMatchInlineSnapshot(`"foo .single sync"`) + }) + + it(`exported async function`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + const returnValue = workerPool.single.async(`.single async`) + // promise is preserved + expect(isPromise(returnValue)).toEqual(true) + + const resolvedValue = await returnValue + expect(resolvedValue).toMatchInlineSnapshot(`"foo .single async"`) + }) + + it(`exported sync function that throws`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + expect.assertions(2) + + const returnValue = workerPool.single.syncThrow(`.single syncThrow`) + // turns sync function into async + expect(isPromise(returnValue)).toEqual(true) + + try { + await returnValue + fail(`promise should reject`) + } catch (e) { + expect(e).toMatchInlineSnapshot(`[Error: sync throw]`) + } + }) + + it(`exported async function that throws`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + expect.assertions(2) + + const returnValue = workerPool.single.asyncThrow(`.single asyncThrow`) + // promise is preserved + expect(isPromise(returnValue)).toEqual(true) + + try { + await returnValue + fail(`promise should reject`) + } catch (e) { + expect(e).toMatchInlineSnapshot(`[Error: async throw]`) + } + }) + }) + + describe(`.all`, () => { + it(`exported sync function`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + const returnValue = workerPool.all.sync(`.single sync`, { + addWorkerId: true, + }) + + expect(returnValue).toBeArrayOfSize(numWorkers) + // turns sync function into async + expect(returnValue).toSatisfyAll(isPromise) + + const resolvedValue = await Promise.all(returnValue) + expect(resolvedValue).toMatchInlineSnapshot(` + Array [ + "foo .single sync (worker #1)", + "foo .single sync (worker #2)", + ] + `) + }) + + it(`exported async function`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + const returnValue = workerPool.all.async(`.single async`, { + addWorkerId: true, + }) + + expect(returnValue).toBeArrayOfSize(numWorkers) + // promise is preserved + expect(returnValue).toSatisfyAll(isPromise) + + const resolvedValue = await Promise.all(returnValue) + expect(resolvedValue).toMatchInlineSnapshot(` + Array [ + "foo .single async (worker #1)", + "foo .single async (worker #2)", + ] + `) + }) + + it(`exported sync function that throws`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + expect.assertions(3) + + const returnValue = workerPool.all.syncThrow(`.single syncThrow`, { + addWorkerId: true, + throwOnWorker: 2, + }) + expect(returnValue).toBeArrayOfSize(numWorkers) + // turns sync function into async + expect(returnValue).toSatisfyAll(isPromise) + + try { + await Promise.all(returnValue) + fail(`promise should reject`) + } catch (e) { + expect(e).toMatchInlineSnapshot(`[Error: sync throw (worker #2)]`) + } + }) + + it(`exported async function that throws`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + expect.assertions(3) + + const returnValue = workerPool.all.asyncThrow(`.single asyncThrow`, { + addWorkerId: true, + throwOnWorker: 2, + }) + expect(returnValue).toBeArrayOfSize(numWorkers) + // promise is preserved + expect(returnValue).toSatisfyAll(isPromise) + + try { + await Promise.all(returnValue) + fail(`promise should reject`) + } catch (e) { + expect(e).toMatchInlineSnapshot(`[Error: async throw (worker #2)]`) + } + }) + }) + + describe(`.end`, () => { + it(`fails currently executed and pending tasks when worker is closed`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + expect.assertions(numWorkers * 2) + + // queueing 2 * numWorkers task, so that currently executed tasks reject + // as well as pending tasks + for (let i = 0; i < numWorkers * 2; i++) { + workerPool.single + .neverEnding() + .then(() => { + fail(`promise should reject`) + }) + .catch(e => { + expect(e).toMatchInlineSnapshot( + `[Error: Worker exited before finishing task]` + ) + }) + } + + await endWorkerPool() + }) + }) + + describe(`task queue`, () => { + it(`distributes task between workers when using .single`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + interface IReturnOfasync100ms { + taskId: number + workerId: string + } + + const promises: Array> = [] + const resultsOrder: Array = [] + const numTask = numWorkers * 10 + for (let taskId = 1; taskId <= numTask; taskId++) { + const promise = workerPool.single.async100ms(taskId, { + addWorkerId: true, + }) + promise.then(res => { + resultsOrder.push(res) + }) + promises.push(promise) + } + + await Promise.all(promises) + + expect(resultsOrder).toBeArrayOfSize(numTask) + + const executedOnWorker1 = resultsOrder.filter(res => res.workerId === `1`) + const executedOnWorker2 = resultsOrder.filter(res => res.workerId === `2`) + + // each worker executed some tasks - we can't ensure that task are evenly split + // so just making sure there are some tasks executed in each worker + expect(executedOnWorker1.length).toBeGreaterThan(0) + expect(executedOnWorker2.length).toBeGreaterThan(0) + + // results from each worker came in order of being called + expect(executedOnWorker1).toEqual( + executedOnWorker1.sort((a, b) => a.taskId - b.taskId) + ) + expect(executedOnWorker2).toEqual( + executedOnWorker2.sort((a, b) => a.taskId - b.taskId) + ) + }) + }) +}) diff --git a/packages/gatsby-worker/src/__tests__/task-queue.ts b/packages/gatsby-worker/src/__tests__/task-queue.ts new file mode 100644 index 0000000000000..fd8dc3d7e2d8d --- /dev/null +++ b/packages/gatsby-worker/src/__tests__/task-queue.ts @@ -0,0 +1,238 @@ +import { TaskQueue } from "../task-queue" + +function getValuesInQueue(queue: TaskQueue): Array { + const valuesInQueue: Array = [] + for (const item of queue) { + valuesInQueue.push(item.value) + } + + return valuesInQueue +} + +describe(`Task Queue`, () => { + it(`correctly holds all queued items in order`, () => { + const taskQueue = new TaskQueue() + + for (let i = 1; i <= 20; i++) { + taskQueue.enqueue(i) + } + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(` + Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + ] + `) + }) + + it(`handles removing first item`, () => { + const taskQueue = new TaskQueue() + + for (let i = 1; i <= 20; i++) { + taskQueue.enqueue(i) + } + + for (const item of taskQueue) { + if (item.value === 1) { + taskQueue.remove(item) + break + } + } + + taskQueue.enqueue(`added after removal`) + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(` + Array [ + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + "added after removal", + ] + `) + }) + + it(`handles removing last item`, () => { + const taskQueue = new TaskQueue() + + for (let i = 1; i <= 20; i++) { + taskQueue.enqueue(i) + } + + for (const item of taskQueue) { + if (item.value === 20) { + taskQueue.remove(item) + break + } + } + + taskQueue.enqueue(`added after removal`) + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(` + Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + "added after removal", + ] + `) + }) + + it(`handles removing item in the middle`, () => { + const taskQueue = new TaskQueue() + + for (let i = 1; i <= 20; i++) { + taskQueue.enqueue(i) + } + + for (const item of taskQueue) { + if (item.value === 11) { + taskQueue.remove(item) + break + } + } + + taskQueue.enqueue(`added after removal`) + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(` + Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + "added after removal", + ] + `) + }) + + it(`queue can fill up after being drained`, () => { + const taskQueue = new TaskQueue() + + taskQueue.enqueue(1) + taskQueue.enqueue(2) + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(` + Array [ + 1, + 2, + ] + `) + + for (const item of taskQueue) { + taskQueue.remove(item) + } + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(`Array []`) + + taskQueue.enqueue(3) + taskQueue.enqueue(4) + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(` + Array [ + 3, + 4, + ] + `) + }) + + describe(`Removed node is not referenced in .next or .prev`, () => { + let taskQueue: TaskQueue + beforeEach(() => { + taskQueue = new TaskQueue() + + taskQueue.enqueue(1) + taskQueue.enqueue(2) + taskQueue.enqueue(3) + }) + + it.each([ + [`removed from head`, 1], + [`removed from middle`, 2], + [`removed from tail`, 3], + ])(`%s`, (_label, itemToRemove) => { + for (const item of taskQueue) { + if (item.value === itemToRemove) { + taskQueue.remove(item) + } + } + + for (const item of taskQueue) { + if (item.value === itemToRemove) { + fail(`"${itemToRemove}" found as value`) + } + + if (item?.prev?.value === itemToRemove) { + fail(`"${itemToRemove}" found as value of previous node`) + } + + if (item?.next?.value === itemToRemove) { + fail(`"${itemToRemove}" found as value of next node`) + } + } + }) + }) +}) diff --git a/packages/gatsby-worker/src/child.ts b/packages/gatsby-worker/src/child.ts new file mode 100644 index 0000000000000..3619108caa6b9 --- /dev/null +++ b/packages/gatsby-worker/src/child.ts @@ -0,0 +1,68 @@ +import { + ParentMessageUnion, + ChildMessageUnion, + EXECUTE, + END, + ERROR, + RESULT, +} from "./types" +import { isPromise } from "./utils" + +/** + * Used to check wether current context is executed in worker process + */ +let isWorker = false + +if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { + isWorker = true + const ensuredSendToMain = process.send.bind(process) as ( + msg: ChildMessageUnion + ) => void + + function onError(error: Error): void { + if (error == null) { + error = new Error(`"null" or "undefined" thrown`) + } + + const msg: ChildMessageUnion = [ + ERROR, + error.constructor && error.constructor.name, + error.message, + error.stack, + error, + ] + + ensuredSendToMain(msg) + } + + function onResult(result: unknown): void { + const msg: ChildMessageUnion = [RESULT, result] + ensuredSendToMain(msg) + } + + const child = require(process.env.GATSBY_WORKER_MODULE_PATH) + + function messageHandler(msg: ParentMessageUnion): void { + if (msg[0] === EXECUTE) { + let result + try { + result = child[msg[1]].call(child, ...msg[2]) + } catch (e) { + onError(e) + return + } + + if (isPromise(result)) { + result.then(onResult, onError) + } else { + onResult(result) + } + } else if (msg[0] === END) { + process.off(`message`, messageHandler) + } + } + + process.on(`message`, messageHandler) +} + +export { isWorker } diff --git a/packages/gatsby-worker/src/index.ts b/packages/gatsby-worker/src/index.ts new file mode 100644 index 0000000000000..f2bc96b2beef5 --- /dev/null +++ b/packages/gatsby-worker/src/index.ts @@ -0,0 +1,347 @@ +import { fork, ChildProcess } from "child_process" + +import { TaskQueue } from "./task-queue" +import { + EXECUTE, + END, + ERROR, + RESULT, + ParentMessageUnion, + ChildMessageUnion, +} from "./types" + +interface IWorkerOptions { + numWorkers?: number + env?: Record +} + +type WrapReturnOfAFunctionInAPromise< + FunctionThatDoesNotReturnAPromise extends (...args: Array) => any +> = ( + ...a: Parameters +) => Promise> + +// gatsby-worker will make sync function async, so to keep proper types we need to adjust types so all functions +// on worker pool are async +type EnsureFunctionReturnsAPromise = MaybeFunction extends ( + ...args: Array +) => Promise + ? MaybeFunction + : MaybeFunction extends (...args: Array) => any + ? WrapReturnOfAFunctionInAPromise + : never + +type WrapReturnInArray = MaybeFunction extends ( + ...args: Array +) => any + ? (...a: Parameters) => Array> + : never + +export type CreateWorkerPoolType = WorkerPool & + { + [FunctionName in keyof ExposedFunctions]: EnsureFunctionReturnsAPromise< + ExposedFunctions[FunctionName] + > + } & { + all: { + [FunctionName in keyof ExposedFunctions]: WrapReturnInArray< + EnsureFunctionReturnsAPromise + > + } + } + +const childWrapperPath = require.resolve(`./child`) + +class TaskInfo { + functionName: T + args: Array + assignedToWorker?: IWorkerInfo + promise: Promise + resolve!: (r: any) => void + reject!: (e: Error) => void + + constructor(opts: { + functionName: T + args: Array + assignedToWorker?: IWorkerInfo + }) { + this.functionName = opts.functionName + this.args = opts.args + this.assignedToWorker = opts.assignedToWorker + this.promise = new Promise((resolve, reject) => { + this.resolve = resolve + this.reject = reject + }) + } +} + +interface IWorkerInfo { + workerId: number + worker: ChildProcess + exitedPromise: Promise<{ + code: number | null + signal: NodeJS.Signals | null + }> + currentTask?: TaskInfo +} + +/** + * Worker pool is a class that allow you to queue function execution across multiple + * child processes, in order to parallelize work. It accepts absolute path to worker module + * and will expose exported function of that module as properties on WorkerPool instance. + * + * Worker pool allows queueing execution of a function on all workers (via `.all` property) + * as well as distributing execution across workers (via `.single` property) + */ +export class WorkerPool> { + /** + * Schedule task execution on all workers. Useful for setting up workers + */ + all: { + [FunctionName in keyof WorkerModuleExports]: WrapReturnInArray< + EnsureFunctionReturnsAPromise + > + } + + /** + * Schedule task execution on single worker. Useful to distribute tasks between multiple workers. + */ + single: { + [FunctionName in keyof WorkerModuleExports]: EnsureFunctionReturnsAPromise< + WorkerModuleExports[FunctionName] + > + } + + private workers: Array> = [] + private taskQueue = new TaskQueue>() + private idleWorkers: Set> = new Set() + + constructor(workerPath: string, options?: IWorkerOptions) { + const single: Partial["single"]> = {} + const all: Partial["all"]> = {} + + { + // we don't need to retain these + const module: WorkerModuleExports = require(workerPath) + const exportNames = Object.keys(module) as Array< + keyof WorkerModuleExports + > + + for (const exportName of exportNames) { + if (typeof module[exportName] !== `function`) { + // We only expose functions. Exposing other types + // would require additional handling which doesn't seem + // worth supporting given that consumers can just access + // those via require/import instead of WorkerPool interface. + continue + } + + single[exportName] = this.scheduleWorkSingle.bind( + this, + exportName + ) as WorkerPool["single"][typeof exportName] + + all[exportName] = (this.scheduleWorkAll.bind( + this, + exportName + ) as unknown) as WorkerPool< + WorkerModuleExports + >["all"][typeof exportName] + } + } + + this.single = single as WorkerPool["single"] + this.all = all as WorkerPool["all"] + + for (let workerId = 1; workerId <= (options?.numWorkers ?? 1); workerId++) { + const worker = fork(childWrapperPath, { + cwd: process.cwd(), + env: { + ...process.env, + ...(options?.env ?? {}), + GATSBY_WORKER_ID: workerId.toString(), + GATSBY_WORKER_MODULE_PATH: workerPath, + }, + // Suppress --debug / --inspect flags while preserving others (like --harmony). + execArgv: process.execArgv.filter(v => !/^--(debug|inspect)/.test(v)), + }) + + const workerInfo: IWorkerInfo = { + workerId, + worker, + exitedPromise: new Promise(resolve => { + worker.on(`exit`, (code, signal) => { + if (workerInfo.currentTask) { + // worker exited without finishing a task + workerInfo.currentTask.reject( + new Error(`Worker exited before finishing task`) + ) + } + // remove worker from list of workers + this.workers.splice(this.workers.indexOf(workerInfo), 1) + resolve({ code, signal }) + }) + }), + } + + worker.on(`message`, (msg: ChildMessageUnion) => { + if (msg[0] === RESULT) { + if (!workerInfo.currentTask) { + throw new Error( + `Invariant: gatsby-worker received execution result, but it wasn't expecting it.` + ) + } + const task = workerInfo.currentTask + workerInfo.currentTask = undefined + this.checkForWork(workerInfo) + task.resolve(msg[1]) + } else if (msg[0] === ERROR) { + if (!workerInfo.currentTask) { + throw new Error( + `Invariant: gatsby-worker received execution rejection, but it wasn't expecting it.` + ) + } + + let error = msg[4] + + if (error !== null && typeof error === `object`) { + const extra = error + + const NativeCtor = global[msg[1]] + const Ctor = typeof NativeCtor === `function` ? NativeCtor : Error + + error = new Ctor(msg[2]) + // @ts-ignore type doesn't exist on Error, but that's what jest-worker does for errors :shrug: + error.type = msg[1] + error.stack = msg[3] + + for (const key in extra) { + if (Object.prototype.hasOwnProperty.call(extra, key)) { + error[key] = extra[key] + } + } + } + + const task = workerInfo.currentTask + workerInfo.currentTask = undefined + this.checkForWork(workerInfo) + task.reject(error) + } + }) + + this.workers.push(workerInfo) + this.idleWorkers.add(workerInfo) + } + } + + /** + * Kills worker processes and rejects and ongoing or pending tasks. + * @returns Array of promises for each worker that will resolve once worker did exit. + */ + end(): Array> { + const results = this.workers.map(async workerInfo => { + // tell worker to end gracefully + const endMessage: ParentMessageUnion = [END] + + workerInfo.worker.send(endMessage) + + // force exit if worker doesn't exit gracefully quickly + const forceExitTimeout = setTimeout(() => { + workerInfo.worker.kill(`SIGKILL`) + }, 1000) + + const exitResult = await workerInfo.exitedPromise + + clearTimeout(forceExitTimeout) + + return exitResult.code + }) + + Promise.all(results).then(() => { + // make sure we fail queued tasks as well + for (const taskNode of this.taskQueue) { + taskNode.value.reject(new Error(`Worker exited before finishing task`)) + } + }) + + return results + } + + private checkForWork( + workerInfo: IWorkerInfo + ): void { + // check if there is task in queue + for (const taskNode of this.taskQueue) { + const task = taskNode.value + if (!task.assignedToWorker || task.assignedToWorker === workerInfo) { + this.doWork(task, workerInfo) + this.taskQueue.remove(taskNode) + + return + } + } + + // no task found, so just marking worker as idle + this.idleWorkers.add(workerInfo) + } + + private doWork( + taskInfo: TaskInfo, + workerInfo: IWorkerInfo + ): void { + // block worker + workerInfo.currentTask = taskInfo + this.idleWorkers.delete(workerInfo) + + const msg: ParentMessageUnion = [ + EXECUTE, + taskInfo.functionName, + taskInfo.args, + ] + workerInfo.worker.send(msg) + } + + private scheduleWork( + taskInfo: TaskInfo + ): Promise { + let workerToExecuteTaskNow: + | IWorkerInfo + | undefined + + if (taskInfo.assignedToWorker) { + if (this.idleWorkers.has(taskInfo.assignedToWorker)) { + workerToExecuteTaskNow = taskInfo.assignedToWorker + } + } else { + workerToExecuteTaskNow = this.idleWorkers.values().next().value + } + + if (workerToExecuteTaskNow) { + this.doWork(taskInfo, workerToExecuteTaskNow) + } else { + this.taskQueue.enqueue(taskInfo) + } + + return taskInfo.promise + } + + private scheduleWorkSingle( + functionName: T, + ...args: Array + ): Promise { + return this.scheduleWork(new TaskInfo({ functionName, args })) + } + + private scheduleWorkAll( + functionName: T, + ...args: Array + ): Array> { + return this.workers.map(workerInfo => + this.scheduleWork( + new TaskInfo({ assignedToWorker: workerInfo, functionName, args }) + ) + ) + } +} + +export * from "./child" diff --git a/packages/gatsby-worker/src/task-queue.ts b/packages/gatsby-worker/src/task-queue.ts new file mode 100644 index 0000000000000..6d0c9f10975b0 --- /dev/null +++ b/packages/gatsby-worker/src/task-queue.ts @@ -0,0 +1,64 @@ +interface ITaskQueueNode { + value: ValueType + next?: ITaskQueueNode + prev?: ITaskQueueNode +} + +/** + * Task queue implemented with doubly linked list + */ +export class TaskQueue { + private head?: ITaskQueueNode + private tail?: ITaskQueueNode; + + *[Symbol.iterator](): Iterator> { + let currentHead = this.head + while (currentHead) { + yield currentHead + currentHead = currentHead.next + } + } + + /** + * Puts new task at the end of the list + * @param task Task to add to the queue + */ + enqueue(task: ValueType): void { + const newNode: ITaskQueueNode = { + value: task, + } + if (this.tail) { + this.tail.next = newNode + newNode.prev = this.tail + } else { + this.head = newNode + } + + this.tail = newNode + } + + /** + * Remove a task node from the queue + * @param taskNode Queue's node to remove + */ + remove(taskNode: ITaskQueueNode): void { + if (taskNode === this.head) { + this.head = taskNode.next + if (this.head) { + this.head.prev = undefined + } else { + // if we don't have head, we also don't have tail + this.tail = undefined + } + } else { + if (taskNode === this.tail) { + this.tail = taskNode.prev + } else { + // if node is not the tail then it will have .next + taskNode.next!.prev = taskNode.prev + } + // if node is not the head then it will have .prev + taskNode.prev!.next = taskNode.next + } + } +} diff --git a/packages/gatsby-worker/src/types.ts b/packages/gatsby-worker/src/types.ts new file mode 100644 index 0000000000000..9e74cc66f5638 --- /dev/null +++ b/packages/gatsby-worker/src/types.ts @@ -0,0 +1,30 @@ +export const EXECUTE = 0b01 +export const ERROR = 0b10 +export const RESULT = 0b11 +export const END = 0b00 + +type FunctionName = string | number | symbol +type FunctionArgs = Array + +type ExecuteMessage = [typeof EXECUTE, FunctionName, FunctionArgs] +type EndMessage = [typeof END] + +export type ParentMessageUnion = ExecuteMessage | EndMessage + +type ErrorType = string +type ErrorMessage = string +type ErrorStack = string + +type TaskError = [ + typeof ERROR, + ErrorType, + ErrorMessage, + ErrorStack | undefined, + Error +] + +type ResultType = unknown + +type TaskResult = [typeof RESULT, ResultType] + +export type ChildMessageUnion = TaskError | TaskResult diff --git a/packages/gatsby-worker/src/utils.ts b/packages/gatsby-worker/src/utils.ts new file mode 100644 index 0000000000000..ed97194b4d713 --- /dev/null +++ b/packages/gatsby-worker/src/utils.ts @@ -0,0 +1,4 @@ +export const isPromise = (obj: any): obj is PromiseLike => + !!obj && + (typeof obj === `object` || typeof obj === `function`) && + typeof obj.then === `function` diff --git a/packages/gatsby-worker/tsconfig.json b/packages/gatsby-worker/tsconfig.json new file mode 100644 index 0000000000000..366c3b052bed0 --- /dev/null +++ b/packages/gatsby-worker/tsconfig.json @@ -0,0 +1,5 @@ +{ + "extends": "../../tsconfig.json", + "include": ["src"], + "exclude": ["**/__tests__/**/*"] +} diff --git a/packages/gatsby/package.json b/packages/gatsby/package.json index beb18bd3955c7..71b8c0a02683c 100644 --- a/packages/gatsby/package.json +++ b/packages/gatsby/package.json @@ -85,6 +85,7 @@ "gatsby-plugin-utils": "^1.9.0-next.1", "gatsby-react-router-scroll": "^4.9.0-next.0", "gatsby-telemetry": "^2.9.0-next.1", + "gatsby-worker": "0.0.0-next.0", "glob": "^7.1.6", "got": "8.3.2", "graphql": "^15.4.0", @@ -95,7 +96,6 @@ "invariant": "^2.2.4", "is-relative": "^1.0.0", "is-relative-url": "^3.0.0", - "jest-worker": "^24.9.0", "joi": "^17.2.1", "json-loader": "^0.5.7", "json-stringify-safe": "^5.0.1", diff --git a/packages/gatsby/src/commands/build-html.ts b/packages/gatsby/src/commands/build-html.ts index 23eee1b430b2b..d69852f170e99 100644 --- a/packages/gatsby/src/commands/build-html.ts +++ b/packages/gatsby/src/commands/build-html.ts @@ -213,8 +213,8 @@ const renderHTMLQueue = async ( const renderHTML = stage === `build-html` - ? workerPool.renderHTMLProd - : workerPool.renderHTMLDev + ? workerPool.single.renderHTMLProd + : workerPool.single.renderHTMLDev const uniqueUnsafeBuiltinUsedStacks = new Set() diff --git a/packages/gatsby/src/datastore/lmdb/lmdb-datastore.ts b/packages/gatsby/src/datastore/lmdb/lmdb-datastore.ts index 6fd37fa77db4c..cd54843040fd7 100644 --- a/packages/gatsby/src/datastore/lmdb/lmdb-datastore.ts +++ b/packages/gatsby/src/datastore/lmdb/lmdb-datastore.ts @@ -9,6 +9,9 @@ import { emitter, replaceReducer } from "../../redux" const rootDbFile = process.env.NODE_ENV === `test` ? `test-datastore-${ + // FORCE_TEST_DATABASE_ID will be set if this gets executed in worker context + // when running jest tests. JEST_WORKER_ID will be set when this gets executed directly + // in test context (jest will use jest-worker internally). process.env.FORCE_TEST_DATABASE_ID ?? process.env.JEST_WORKER_ID }` : `datastore` diff --git a/packages/gatsby/src/utils/dev-ssr/render-dev-html-child.js b/packages/gatsby/src/utils/dev-ssr/render-dev-html-child.js index d9d36e65a7c9d..3b95056c02620 100644 --- a/packages/gatsby/src/utils/dev-ssr/render-dev-html-child.js +++ b/packages/gatsby/src/utils/dev-ssr/render-dev-html-child.js @@ -115,5 +115,3 @@ exports.renderHTML = ({ exports.deleteModuleCache = htmlComponentRendererPath => { delete require.cache[require.resolve(htmlComponentRendererPath)] } - -exports.warmup = () => `warmed` diff --git a/packages/gatsby/src/utils/dev-ssr/render-dev-html.ts b/packages/gatsby/src/utils/dev-ssr/render-dev-html.ts index 17e31cfc9ee67..53e182d430cd2 100644 --- a/packages/gatsby/src/utils/dev-ssr/render-dev-html.ts +++ b/packages/gatsby/src/utils/dev-ssr/render-dev-html.ts @@ -1,4 +1,4 @@ -import JestWorker from "jest-worker" +import { WorkerPool } from "gatsby-worker" import fs from "fs-extra" import nodePath from "path" import report from "gatsby-cli/lib/reporter" @@ -11,30 +11,47 @@ import { getDevSSRWebpack } from "../../commands/build-html" import { emitter, GatsbyReduxStore } from "../../redux" import { IGatsbyPage } from "../../redux/types" -const startWorker = (): JestWorker => { - const newWorker = new JestWorker(require.resolve(`./render-dev-html-child`), { - exposedMethods: [`renderHTML`, `deleteModuleCache`, `warmup`], - numWorkers: 1, - forkOptions: { - silent: false, +interface IErrorRenderMeta { + codeFrame: string + source: string + line: number + column: number + sourceMessage?: string + stack?: string +} + +// TODO: convert `render-dev-html-child.js` to TS and use `typeof import("./render-dev-html-child")` +// instead of defining interface here +interface IRenderDevHtmlChild { + renderHTML: (arg: { + path: string + componentPath: string + htmlComponentRendererPath: string + publicDir: string + isClientOnlyPage?: boolean + error?: IErrorRenderMeta + directory?: string + }) => Promise + deleteModuleCache: (htmlComponentRendererPath: string) => void +} + +const startWorker = (): WorkerPool => { + const newWorker = new WorkerPool( + require.resolve(`./render-dev-html-child`), + { + numWorkers: 1, env: { - ...process.env, NODE_ENV: isCI() ? `production` : `development`, forceColors: `true`, GATSBY_EXPERIMENTAL_DEV_SSR: `true`, }, - }, - }) - - // jest-worker is lazy with forking but we want to fork immediately so the user - // doesn't have to wait. - // @ts-ignore - newWorker.warmup() + } + ) return newWorker } -let worker +let worker: WorkerPool export const initDevWorkerPool = (): void => { worker = startWorker() } @@ -53,7 +70,7 @@ export const restartWorker = (htmlComponentRendererPath: string): void => { oldWorker.end() changeCount = 0 } else { - worker.deleteModuleCache(htmlComponentRendererPath) + worker.all.deleteModuleCache(htmlComponentRendererPath) } } @@ -136,14 +153,7 @@ interface IRenderDevHtmlProps { page: IGatsbyPage skipSsr?: boolean store: GatsbyReduxStore - error?: { - codeFrame: string - source: string - line: number - column: number - sourceMessage?: string - stack?: string - } + error?: IErrorRenderMeta htmlComponentRendererPath: string directory: string } @@ -243,7 +253,7 @@ export const renderDevHTML = ({ const publicDir = nodePath.join(directory, `public`) try { - const htmlString = await worker.renderHTML({ + const htmlString = await worker.single.renderHTML({ path, componentPath: pageObj.component, htmlComponentRendererPath, diff --git a/packages/gatsby/src/utils/worker/__tests__/config.ts b/packages/gatsby/src/utils/worker/__tests__/config.ts index c21de65b8af35..e2b601481f7cc 100644 --- a/packages/gatsby/src/utils/worker/__tests__/config.ts +++ b/packages/gatsby/src/utils/worker/__tests__/config.ts @@ -21,11 +21,15 @@ it(`can load config and execute node API in worker`, async () => { const siteDirectory = path.join(__dirname, `fixtures`, `sample-site`) // plugin options for custom local plugin contains function (() => `foo`) - await worker.loadConfigAndPlugins({ siteDirectory }) + await Promise.all( + worker.all.loadConfigAndPlugins({ + siteDirectory, + }) + ) // plugin API execute function from plugin options and store result in `global` - await worker.runAPI(`createSchemaCustomization`) + await Promise.all(worker.all.runAPI(`createSchemaCustomization`)) // getting result stored in `global` - expect(await worker.getAPIRunResult()).toEqual(`foo`) + expect(await worker.single.getAPIRunResult()).toEqual(`foo`) }) diff --git a/packages/gatsby/src/utils/worker/__tests__/datastore.ts b/packages/gatsby/src/utils/worker/__tests__/datastore.ts index a1b168edc50e5..246733e376a89 100644 --- a/packages/gatsby/src/utils/worker/__tests__/datastore.ts +++ b/packages/gatsby/src/utils/worker/__tests__/datastore.ts @@ -26,7 +26,7 @@ itWhenLMDB(`worker can access node created in main process`, async () => { const testNodeId = `shared-node` expect(getDataStore().getNode(testNodeId)).toBeFalsy() - expect(await worker.getNodeFromWorker(testNodeId)).toBeFalsy() + expect(await worker.single.getNodeFromWorker(testNodeId)).toBeFalsy() const node = { id: testNodeId, @@ -39,7 +39,9 @@ itWhenLMDB(`worker can access node created in main process`, async () => { await getDataStore().ready() const nodeStoredInMainProcess = getDataStore().getNode(testNodeId) - const nodeStoredInWorkerProcess = await worker.getNodeFromWorker(testNodeId) + const nodeStoredInWorkerProcess = await worker.single.getNodeFromWorker( + testNodeId + ) expect(nodeStoredInWorkerProcess).toMatchInlineSnapshot(` Object { diff --git a/packages/gatsby/src/utils/worker/__tests__/queries.ts b/packages/gatsby/src/utils/worker/__tests__/queries.ts index 2ee929e797b71..d2455e50153b2 100644 --- a/packages/gatsby/src/utils/worker/__tests__/queries.ts +++ b/packages/gatsby/src/utils/worker/__tests__/queries.ts @@ -111,7 +111,7 @@ describeWhenLMDB(`worker (queries)`, () => { const siteDirectory = path.join(__dirname, `fixtures`, `sample-site`) await loadConfigAndPlugins({ siteDirectory }) - await worker.loadConfigAndPlugins({ siteDirectory }) + await Promise.all(worker.all.loadConfigAndPlugins({ siteDirectory })) await sourceNodesAndRemoveStaleNodes({ webhookBody: {} }) await getDataStore().ready() @@ -162,8 +162,8 @@ describeWhenLMDB(`worker (queries)`, () => { saveStateForWorkers([`components`, `staticQueryComponents`]) - await worker.buildSchema() - await worker.runQueries(queryIdsSmall) + await Promise.all(worker.all.buildSchema()) + await worker.single.runQueries(queryIdsSmall) }) afterAll(() => { @@ -178,7 +178,7 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should execute static queries`, async () => { if (!worker) fail(`worker not defined`) - const stateFromWorker = await worker.getState() + const stateFromWorker = await worker.single.getState() const staticQueryResult = await fs.readJson( `${stateFromWorker.program.directory}/public/page-data/sq/d/${dummyStaticQuery.hash}.json` @@ -195,7 +195,7 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should execute page queries`, async () => { if (!worker) fail(`worker not defined`) - const stateFromWorker = await worker.getState() + const stateFromWorker = await worker.single.getState() const pageQueryResult = await fs.readJson( `${stateFromWorker.program.directory}/.cache/json/_foo.json` @@ -210,7 +210,7 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should execute page queries with context variables`, async () => { if (!worker) fail(`worker not defined`) - const stateFromWorker = await worker.getState() + const stateFromWorker = await worker.single.getState() const pageQueryResult = await fs.readJson( `${stateFromWorker.program.directory}/.cache/json/_bar.json` @@ -227,11 +227,11 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should chunk work in runQueriesInWorkersQueue`, async () => { if (!worker) fail(`worker not defined`) - const spy = jest.spyOn(worker, `runQueries`) + const spy = jest.spyOn(worker.single, `runQueries`) // @ts-ignore - worker is defined await runQueriesInWorkersQueue(worker, queryIdsBig, 10) - const stateFromWorker = await worker.getState() + const stateFromWorker = await worker.single.getState() // Called the complete ABC so we can test _a const pageQueryResultA = await fs.readJson( diff --git a/packages/gatsby/src/utils/worker/__tests__/reporter.ts b/packages/gatsby/src/utils/worker/__tests__/reporter.ts index 1cd54bd4fe085..d13c542191f50 100644 --- a/packages/gatsby/src/utils/worker/__tests__/reporter.ts +++ b/packages/gatsby/src/utils/worker/__tests__/reporter.ts @@ -14,7 +14,7 @@ it(`worker can use reporter without crashing`, async () => { worker = createTestWorker() try { - const result = await worker.log(`log`) + const result = await worker.single.log(`log`) expect(result).toEqual(true) } catch (e) { expect(e).toBeFalsy() diff --git a/packages/gatsby/src/utils/worker/__tests__/schema.ts b/packages/gatsby/src/utils/worker/__tests__/schema.ts index 8d3c6c796c7dd..01392a2acb97d 100644 --- a/packages/gatsby/src/utils/worker/__tests__/schema.ts +++ b/packages/gatsby/src/utils/worker/__tests__/schema.ts @@ -49,15 +49,15 @@ describeWhenLMDB(`worker (schema)`, () => { const siteDirectory = path.join(__dirname, `fixtures`, `sample-site`) await loadConfigAndPlugins({ siteDirectory }) - await worker.loadConfigAndPlugins({ siteDirectory }) + await Promise.all(worker.all.loadConfigAndPlugins({ siteDirectory })) await sourceNodesAndRemoveStaleNodes({ webhookBody: {} }) await getDataStore().ready() await build({ parentSpan: {} }) saveStateForWorkers([`inferenceMetadata`]) - await worker.buildSchema() + await Promise.all(worker.all.buildSchema()) - stateFromWorker = await worker.getState() + stateFromWorker = await worker.single.getState() }) afterAll(() => { @@ -130,7 +130,7 @@ describeWhenLMDB(`worker (schema)`, () => { it(`should have resolverField from createResolvers`, async () => { // @ts-ignore - it exists - const { data } = await worker?.getRunQueryResult(` + const { data } = await worker?.single.getRunQueryResult(` { one: nodeTypeOne { number @@ -151,7 +151,7 @@ describeWhenLMDB(`worker (schema)`, () => { it(`should have fieldsOnGraphQL from setFieldsOnGraphQLNodeType`, async () => { // @ts-ignore - it exists - const { data } = await worker?.getRunQueryResult(` + const { data } = await worker?.single.getRunQueryResult(` { four: nodeTypeOne { fieldsOnGraphQL diff --git a/packages/gatsby/src/utils/worker/__tests__/share-state.ts b/packages/gatsby/src/utils/worker/__tests__/share-state.ts index 54fb6b38ffa71..42dc28c4bbe82 100644 --- a/packages/gatsby/src/utils/worker/__tests__/share-state.ts +++ b/packages/gatsby/src/utils/worker/__tests__/share-state.ts @@ -48,7 +48,7 @@ describe(`worker (share-state)`, () => { worker = createTestWorker() - const result = await worker.getPage(dummyPagePayload.path) + const result = await worker.single.getPage(dummyPagePayload.path) expect(result).toBe(null) }) @@ -208,10 +208,12 @@ describe(`worker (share-state)`, () => { saveStateForWorkers([`components`, `staticQueryComponents`]) - await worker.setQueries() + await Promise.all(worker.all.setQueries()) - const components = await worker.getComponent(dummyPagePayload.component) - const staticQueryComponents = await worker.getStaticQueryComponent( + const components = await worker.single.getComponent( + dummyPagePayload.component + ) + const staticQueryComponents = await worker.single.getStaticQueryComponent( staticQueryID ) @@ -256,9 +258,9 @@ describe(`worker (share-state)`, () => { saveStateForWorkers([`inferenceMetadata`]) - await worker.setInferenceMetadata() + await Promise.all(worker.all.setInferenceMetadata()) - const inf = await worker.getInferenceMetadata(`Test`) + const inf = await worker.single.getInferenceMetadata(`Test`) expect(inf).toMatchInlineSnapshot(` Object { diff --git a/packages/gatsby/src/utils/worker/__tests__/test-helpers/create-test-worker.ts b/packages/gatsby/src/utils/worker/__tests__/test-helpers/create-test-worker.ts index 867b63ef9ba47..d0d86ecb7f3f4 100644 --- a/packages/gatsby/src/utils/worker/__tests__/test-helpers/create-test-worker.ts +++ b/packages/gatsby/src/utils/worker/__tests__/test-helpers/create-test-worker.ts @@ -1,36 +1,23 @@ -import Worker from "jest-worker" -import type { CreateWorkerPoolType } from "../../types" +import { WorkerPool } from "gatsby-worker" -export type GatsbyTestWorkerPool = CreateWorkerPoolType< +export type GatsbyTestWorkerPool = WorkerPool< typeof import("./child-for-tests") > export function createTestWorker(): GatsbyTestWorkerPool { - // all child processes of this worker pool would have JEST_WORKER_ID set to 1 - // but running jest tests would create processes with possibly other IDs - // this will let child processes use same database ID as parent process (one that executes test) - process.env.FORCE_TEST_DATABASE_ID = process.env.JEST_WORKER_ID - process.env.GATSBY_WORKER_POOL_WORKER = `true` - - // deleting process.env.JEST_WORKER_ID is a workaround for `@babel/register` used in `./wrapper-for-tests` - // creating new Worker without explicitly defining `exposedMethods` result in requiring the module - // we need a way to recognize when require module is executed in context of process running test - // versus process(es) spawned for worker. jest-worker actually doesn't have a way to pass custom env vars - // to worker process without overriding JEST_WORKER_ID (yikes), but it does reuse current process.env - // so we temporarily unset JEST_WORKER_ID, so that module loaded in the same process as this code is executed - // doesn't use `@babel/register` and we rely on `jest-worker` auto-assigning JEST_WORKER_ID for child processes - const tmpJestWorkerId = process.env.JEST_WORKER_ID - delete process.env.JEST_WORKER_ID - - const worker = new Worker(require.resolve(`./wrapper-for-tests`), { - numWorkers: 1, - forkOptions: { - silent: false, - }, - maxRetries: 1, - }) as GatsbyTestWorkerPool - delete process.env.GATSBY_WORKER_POOL_WORKER - process.env.JEST_WORKER_ID = tmpJestWorkerId + const worker = new WorkerPool( + require.resolve(`./child-for-tests`), + { + numWorkers: 1, + env: { + // We are using JEST_WORKER_ID env so that worker use same test database as + // jest runner process + FORCE_TEST_DATABASE_ID: process.env.JEST_WORKER_ID, + GATSBY_WORKER_POOL_WORKER: `true`, + NODE_OPTIONS: `--require ${require.resolve(`./ts-register`)}`, + }, + } + ) as GatsbyTestWorkerPool return worker } diff --git a/packages/gatsby/src/utils/worker/__tests__/test-helpers/ts-register.js b/packages/gatsby/src/utils/worker/__tests__/test-helpers/ts-register.js new file mode 100644 index 0000000000000..26ff716b30bc2 --- /dev/null +++ b/packages/gatsby/src/utils/worker/__tests__/test-helpers/ts-register.js @@ -0,0 +1,6 @@ +// spawned process won't use jest config to support TS, so we need to add support ourselves +require(`@babel/register`)({ + extensions: [`.js`, `.ts`], + configFile: require.resolve(`../../../../../babel.config.js`), + ignore: [/node_modules/], +}) diff --git a/packages/gatsby/src/utils/worker/__tests__/test-helpers/wrapper-for-tests.js b/packages/gatsby/src/utils/worker/__tests__/test-helpers/wrapper-for-tests.js deleted file mode 100644 index 5dd264f17609b..0000000000000 --- a/packages/gatsby/src/utils/worker/__tests__/test-helpers/wrapper-for-tests.js +++ /dev/null @@ -1,13 +0,0 @@ -// this IS executed in process that spawn worker to analyze exports and create functions for them on worker pool object -// so we want to use `@babel/register` ONLY inside actual worker as otherwise it does mess with jest transformation and tools like `rewire` -// see `./create-test-worker.ts` file for explanation why this env var is used. -if (process.env.JEST_WORKER_ID) { - // spawned process won't use jest config to support TS, so we need to add support ourselves - require(`@babel/register`)({ - extensions: [`.js`, `.ts`], - configFile: require.resolve(`../../../../../babel.config.js`), - ignore: [/node_modules/], - }) -} - -module.exports = require(`./child-for-tests`) diff --git a/packages/gatsby/src/utils/worker/pool.ts b/packages/gatsby/src/utils/worker/pool.ts index 82823b279d092..de9cb6c03abd7 100644 --- a/packages/gatsby/src/utils/worker/pool.ts +++ b/packages/gatsby/src/utils/worker/pool.ts @@ -1,22 +1,23 @@ -import Worker from "jest-worker" +import { WorkerPool } from "gatsby-worker" import { chunk } from "lodash" import reporter from "gatsby-cli/lib/reporter" import { cpuCoreCount } from "gatsby-core-utils" -import type { CreateWorkerPoolType } from "./types" import { IGroupedQueryIds } from "../../services" -export type GatsbyWorkerPool = CreateWorkerPoolType +export type GatsbyWorkerPool = WorkerPool export const create = (): GatsbyWorkerPool => { - process.env.GATSBY_WORKER_POOL_WORKER = `true` - const worker = new Worker(require.resolve(`./child`), { - numWorkers: Math.max(1, cpuCoreCount() - 1), - forkOptions: { - silent: false, - }, - }) as GatsbyWorkerPool - delete process.env.GATSBY_WORKER_POOL_WORKER + const worker = new WorkerPool( + require.resolve(`./child`), + { + numWorkers: Math.max(1, cpuCoreCount() - 1), + env: { + GATSBY_WORKER_POOL_WORKER: `true`, + }, + } + ) + return worker } @@ -38,7 +39,7 @@ export async function runQueriesInWorkersQueue( for (const segment of staticQuerySegments) { promises.push( - pool + pool.single .runQueries({ pageQueryIds: [], staticQueryIds: segment }) .then(() => { activity.tick(segment.length) @@ -48,7 +49,7 @@ export async function runQueriesInWorkersQueue( for (const segment of pageQuerySegments) { promises.push( - pool + pool.single .runQueries({ pageQueryIds: segment, staticQueryIds: [] }) .then(() => { activity.tick(segment.length) diff --git a/packages/gatsby/src/utils/worker/types.ts b/packages/gatsby/src/utils/worker/types.ts deleted file mode 100644 index 29f1827eea9b1..0000000000000 --- a/packages/gatsby/src/utils/worker/types.ts +++ /dev/null @@ -1,24 +0,0 @@ -import type Worker from "jest-worker" - -type WrapReturnOfAFunctionInAPromise< - FunctionThatDoesNotReturnAPromise extends (...args: Array) => any -> = ( - ...a: Parameters -) => Promise> - -// jest-worker will make sync function async, so to keep proper types we need to adjust types so all functions -// on worker pool are async -type EnsureFunctionReturnsAPromise = MaybeFunction extends ( - ...args: Array -) => Promise - ? MaybeFunction - : MaybeFunction extends (...args: Array) => any - ? WrapReturnOfAFunctionInAPromise - : never - -export type CreateWorkerPoolType = Worker & - { - [FunctionName in keyof ExposedFunctions]: EnsureFunctionReturnsAPromise< - ExposedFunctions[FunctionName] - > - } diff --git a/renovate.json5 b/renovate.json5 index 0cf6bd41aecd5..1c3d65808c9b5 100644 --- a/renovate.json5 +++ b/renovate.json5 @@ -22700,6 +22700,208 @@ "commitMessageSuffix": "{{#unless groupName}} for gatsby-transformer-yaml{{/unless}}", "dependencyDashboardApproval": true }, + { + "matchPaths": [ + "packages/gatsby-worker/package.json" + ], + "matchDepTypes": [ + "devDependencies" + ], + "matchUpdateTypes": [ + "patch", + "minor" + ], + "groupName": "[DEV] minor and patch dependencies for gatsby-worker", + "groupSlug": "gatsby-worker-dev-minor", + "automerge": true, + "excludePackageNames": [ + "eslint", + "prettier", + "cross-env", + "execa", + "mini-css-extract-plugin", + "sharp", + "@types/sharp", + "typescript", + "chalk", + "fs-extra", + "@types/fs-extra" + ], + "excludePackagePatterns": [ + "^eslint-", + "^@typescript-eslint/", + "^@testing-library/" + ], + "commitMessageSuffix": "{{#unless groupName}} for gatsby-worker{{/unless}}" + }, + { + "matchPaths": [ + "packages/gatsby-worker/package.json" + ], + "matchDepTypes": [ + "devDependencies" + ], + "matchUpdateTypes": [ + "major" + ], + "groupName": "[DEV] major dependencies for gatsby-worker", + "groupSlug": "gatsby-worker-dev-major", + "automerge": true, + "dependencyDashboardApproval": false, + "excludePackageNames": [ + "eslint", + "prettier", + "cross-env", + "execa", + "mini-css-extract-plugin", + "sharp", + "@types/sharp", + "typescript", + "chalk", + "fs-extra", + "@types/fs-extra" + ], + "excludePackagePatterns": [ + "^eslint-", + "^@typescript-eslint/", + "^@testing-library/" + ], + "commitMessageSuffix": "{{#unless groupName}} for gatsby-worker{{/unless}}" + }, + { + "matchPaths": [ + "packages/gatsby-worker/package.json" + ], + "matchDepTypes": [ + "dependencies" + ], + "matchUpdateTypes": [ + "patch", + "minor" + ], + "groupName": "minor and patch dependencies for gatsby-worker", + "groupSlug": "gatsby-worker-prod-minor", + "excludePackageNames": [ + "eslint", + "prettier", + "cross-env", + "execa", + "mini-css-extract-plugin", + "sharp", + "@types/sharp", + "typescript", + "chalk", + "fs-extra", + "@types/fs-extra" + ], + "excludePackagePatterns": [ + "^eslint-", + "^@typescript-eslint/", + "^@testing-library/" + ], + "commitMessageSuffix": "{{#unless groupName}} for gatsby-worker{{/unless}}" + }, + { + "matchPaths": [ + "packages/gatsby-worker/package.json" + ], + "matchDepTypes": [ + "dependencies" + ], + "matchUpdateTypes": [ + "major" + ], + "groupName": "major dependencies for gatsby-worker", + "groupSlug": "gatsby-worker-prod-major", + "excludePackageNames": [ + "eslint", + "prettier", + "cross-env", + "execa", + "mini-css-extract-plugin", + "sharp", + "@types/sharp", + "typescript", + "chalk", + "fs-extra", + "@types/fs-extra" + ], + "excludePackagePatterns": [ + "^eslint-", + "^@typescript-eslint/", + "^@testing-library/" + ], + "commitMessageSuffix": "{{#unless groupName}} for gatsby-worker{{/unless}}", + "dependencyDashboardApproval": true + }, + { + "matchPaths": [ + "packages/gatsby-worker/package.json" + ], + "matchDepTypes": [ + "dependencies" + ], + "groupName": "minor and patch dependencies for gatsby-worker", + "groupSlug": "gatsby-worker-prod-minor", + "matchPackageNames": [], + "matchUpdateTypes": [ + "patch" + ], + "excludePackageNames": [ + "eslint", + "prettier", + "cross-env", + "execa", + "mini-css-extract-plugin", + "sharp", + "@types/sharp", + "typescript", + "chalk", + "fs-extra", + "@types/fs-extra" + ], + "excludePackagePatterns": [ + "^eslint-", + "^@typescript-eslint/", + "^@testing-library/" + ], + "commitMessageSuffix": "{{#unless groupName}} for gatsby-worker{{/unless}}" + }, + { + "matchPaths": [ + "packages/gatsby-worker/package.json" + ], + "matchDepTypes": [ + "dependencies" + ], + "groupName": "major dependencies for gatsby-worker", + "groupSlug": "gatsby-worker-prod-major", + "matchPackageNames": [], + "matchUpdateTypes": [ + "major", + "minor" + ], + "excludePackageNames": [ + "eslint", + "prettier", + "cross-env", + "execa", + "mini-css-extract-plugin", + "sharp", + "@types/sharp", + "typescript", + "chalk", + "fs-extra", + "@types/fs-extra" + ], + "excludePackagePatterns": [ + "^eslint-", + "^@typescript-eslint/", + "^@testing-library/" + ], + "commitMessageSuffix": "{{#unless groupName}} for gatsby-worker{{/unless}}", + "dependencyDashboardApproval": true + }, { "matchPaths": [ "packages/gatsby/package.json"