From bc67f4f8bc48fbb975435bd6d21cb38144bfc702 Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Fri, 25 Jun 2021 14:01:54 +0200 Subject: [PATCH 01/13] feat: create gatsby-worker package --- packages/gatsby-worker/.babelrc | 3 + packages/gatsby-worker/.gitignore | 2 + packages/gatsby-worker/.npmignore | 36 ++ packages/gatsby-worker/README.md | 112 ++++++ packages/gatsby-worker/package.json | 41 +++ .../src/__tests__/fixtures/test-child.ts | 34 ++ .../src/__tests__/fixtures/ts-register.js | 6 + .../src/__tests__/integration.ts | 285 ++++++++++++++++ .../gatsby-worker/src/__tests__/task-queue.ts | 201 +++++++++++ packages/gatsby-worker/src/child.ts | 66 ++++ packages/gatsby-worker/src/index.ts | 322 ++++++++++++++++++ packages/gatsby-worker/src/task-queue.ts | 49 +++ packages/gatsby-worker/src/types.ts | 30 ++ packages/gatsby-worker/src/utils.ts | 4 + packages/gatsby-worker/tsconfig.json | 8 + renovate.json5 | 202 +++++++++++ 16 files changed, 1401 insertions(+) create mode 100644 packages/gatsby-worker/.babelrc create mode 100644 packages/gatsby-worker/.gitignore create mode 100644 packages/gatsby-worker/.npmignore create mode 100644 packages/gatsby-worker/README.md create mode 100644 packages/gatsby-worker/package.json create mode 100644 packages/gatsby-worker/src/__tests__/fixtures/test-child.ts create mode 100644 packages/gatsby-worker/src/__tests__/fixtures/ts-register.js create mode 100644 packages/gatsby-worker/src/__tests__/integration.ts create mode 100644 packages/gatsby-worker/src/__tests__/task-queue.ts create mode 100644 packages/gatsby-worker/src/child.ts create mode 100644 packages/gatsby-worker/src/index.ts create mode 100644 packages/gatsby-worker/src/task-queue.ts create mode 100644 packages/gatsby-worker/src/types.ts create mode 100644 packages/gatsby-worker/src/utils.ts create mode 100644 packages/gatsby-worker/tsconfig.json diff --git a/packages/gatsby-worker/.babelrc b/packages/gatsby-worker/.babelrc new file mode 100644 index 0000000000000..ac0ad292bb087 --- /dev/null +++ b/packages/gatsby-worker/.babelrc @@ -0,0 +1,3 @@ +{ + "presets": [["babel-preset-gatsby-package"]] +} diff --git a/packages/gatsby-worker/.gitignore b/packages/gatsby-worker/.gitignore new file mode 100644 index 0000000000000..b2d59d1f7578b --- /dev/null +++ b/packages/gatsby-worker/.gitignore @@ -0,0 +1,2 @@ +/node_modules +/dist \ No newline at end of file diff --git a/packages/gatsby-worker/.npmignore b/packages/gatsby-worker/.npmignore new file mode 100644 index 0000000000000..d0a59d6ec3da3 --- /dev/null +++ b/packages/gatsby-worker/.npmignore @@ -0,0 +1,36 @@ +# Logs +logs +*.log + +# Runtime data +pids +*.pid +*.seed + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage + +# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (http://nodejs.org/api/addons.html) +build/Release + +# Dependency directory +# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git +node_modules +*.un~ +yarn.lock +src +flow-typed +coverage +decls +examples +.babelrc +tsconfig.json \ No newline at end of file diff --git a/packages/gatsby-worker/README.md b/packages/gatsby-worker/README.md new file mode 100644 index 0000000000000..0a00d08823d54 --- /dev/null +++ b/packages/gatsby-worker/README.md @@ -0,0 +1,112 @@ +# gatsby-worker + +Utility to execute tasks in forked processes. Highly inspired by [`jest-worker`](https://www.npmjs.com/package/jest-worker). + +## Example + +File `worker.ts`: + +```ts +export async function heavyTask(param: string): Promise { + // using workers is ideal for CPU intensive tasks + return await heavyProcessing(param) +} + +export async function setupStep(param: string): Promise { + await heavySetup(param) +} +``` + +File `parent.ts` + +```ts +import { WorkerPool } from "gatsby-worker" + +const workerPool = new WorkerPool( + require.resolve(`./worker`), + { + numWorkers: 5, + env: { + CUSTOM_ENV_VAR_TO_SET_IN_WORKER: `foo`, + }, + } +) + +// queue a task on all workers +const arrayOfPromises = workerPool.all.setupStep(`bar`) + +// queue a task on single worker +const singlePromise = workerPool.single.heavyTask(`baz`) +``` + +## API + +### Constructor + +```ts +// TypeOfWorkerModule allows to type exposed functions ensuring type safety. +// It will convert sync methods to async and discard/disallow usage of exports +// that are not functions. Recommended to use with ``. +const workerPool = new WorkerPool( + // Absolute path to worker module. Recommended to use with `require.resolve` + workerPath: string, + // Not required options + options?: { + // Number of workers to spawn. Defaults to `1` if not defined. + numWorkers?: number + // Additional env vars to set in worker. Worker will inherit env vars of parent process + // as well as additional `GATSBY_WORKER_ID` env var (starting with "1" for first worker) + env?: Record + } +) +``` + +### `.single` + +```ts +// Exports of the worker module become available under `.single` property of `WorkerPool` instance. +// Calling those will either start executing immediately if there are any idle workers or queue them +// to be executed once a worker become idle. +const singlePromise = workerPool.single.heavyTask(`baz`) +``` + +### `.all` + +```ts +// Exports of the worker module become available under `.all` property of `WorkerPool` instance. +// Calling those will ensure a function is executed on all workers. Best usage for this is performing +// setup/bootstrap of workers. +const arrayOfPromises = workerPool.all.setupStep(`baz`) +``` + +### `.end` + +```ts +// Used to shutdown `WorkerPool`. If there are any in progress or queued tasks, promises for those will be rejected as they won't be able to complete. +const arrayOfPromises = workerPool.end() +``` + +## Usage with unit tests + +If you are working with source files that need transpilation, you will need to make it possible to load untranspiled modules in child processes. +This can be done with `@babel/register` (or similar depending on your build toolchain). Example setup: + +```ts +const testWorkerPool = new WorkerPool(workerModule, { + numWorkers, + env: { + NODE_OPTIONS: `--require ${require.resolve(`./ts-register`)}`, + }, +}) +``` + +This will execute additional module before allowing adding runtime support for new JavaScript syntax or support for TypeScript. Example `ts-register.js`: + +```js +// spawned process won't use jest config (or other testing framework equivalent) to support TS, so we need to add support ourselves +require(`@babel/register`)({ + extensions: [`.js`, `.ts`], + configFile: require.resolve(relativePathToYourBabelConfig), + ignore: [/node_modules/], +}) +``` diff --git a/packages/gatsby-worker/package.json b/packages/gatsby-worker/package.json new file mode 100644 index 0000000000000..5d025f1734774 --- /dev/null +++ b/packages/gatsby-worker/package.json @@ -0,0 +1,41 @@ +{ + "name": "gatsby-worker", + "description": "Utility to create worker pools", + "version": "0.0.0-next.0", + "author": "Michal Piechowiak", + "bugs": { + "url": "https://github.com/gatsbyjs/gatsby/issues" + }, + "dependencies": { + "@babel/core": "^7.14.0" + }, + "devDependencies": { + "@babel/cli": "^7.14.0", + "@babel/register": "^7.14.0", + "babel-preset-gatsby-package": "^1.9.0-next.0", + "cross-env": "^7.0.3", + "rimraf": "^3.0.2", + "typescript": "^4.1.5" + }, + "homepage": "https://github.com/gatsbyjs/gatsby/tree/master/packages/gatsby-worker#readme", + "keywords": [ + "gatsby", + "worker" + ], + "license": "MIT", + "main": "dist/index.js", + "repository": { + "type": "git", + "url": "https://github.com/gatsbyjs/gatsby.git", + "directory": "packages/gatsby-worker" + }, + "scripts": { + "build": "babel src --out-dir dist/ --ignore \"**/__tests__\" --extensions \".ts,.js\"", + "prepare": "cross-env NODE_ENV=production npm run build && npm run typegen", + "watch": "babel -w src --out-dir dist/ --ignore \"**/__tests__\" --extensions \".ts,.js\"", + "typegen": "rimraf \"dist/**/*.d.ts\" && tsc --emitDeclarationOnly --declaration --declarationDir dist/" + }, + "engines": { + "node": ">=12.13.0" + } +} diff --git a/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts b/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts new file mode 100644 index 0000000000000..0b3c5219b04af --- /dev/null +++ b/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts @@ -0,0 +1,34 @@ +export function sync(a: string, opts?: { addWorkerId?: boolean }): string { + return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}` +} + +export async function async(a: string, opts?: { addWorkerId?: boolean }): Promise { + return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}` +} + +export function neverEnding(): Promise { + return new Promise(() => {}) +} + +export const notAFunction = `string` + +export function syncThrow(a: string, opts?: { addWorkerId?: boolean, throwOnWorker?: number }): string { + if (!opts?.throwOnWorker || opts?.throwOnWorker?.toString() === process.env.GATSBY_WORKER_ID) { + throw new Error(`sync throw${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}`) + } + + return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}` +} + +export async function asyncThrow(a: string, opts?: { addWorkerId?: boolean, throwOnWorker?: number }): Promise { + if (!opts?.throwOnWorker || opts?.throwOnWorker?.toString() === process.env.GATSBY_WORKER_ID) { + throw new Error(`async throw${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}`) + } + + return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}` +} + +// used in task queue as previous functions would be too often too fast +export async function async100ms(taskId: number, opts?: { addWorkerId?: boolean }): Promise<{taskId: number, workerId: string}> { + return new Promise(resolve => setTimeout(resolve, 100, {taskId, workerId: opts?.addWorkerId ? process.env.GATSBY_WORKER_ID : undefined})) +} \ No newline at end of file diff --git a/packages/gatsby-worker/src/__tests__/fixtures/ts-register.js b/packages/gatsby-worker/src/__tests__/fixtures/ts-register.js new file mode 100644 index 0000000000000..e3cc4efe14ad6 --- /dev/null +++ b/packages/gatsby-worker/src/__tests__/fixtures/ts-register.js @@ -0,0 +1,6 @@ +// spawned process won't use jest config to support TS, so we need to add support ourselves +require(`@babel/register`)({ + extensions: [`.js`, `.ts`], + configFile: require.resolve(`../../../.babelrc`), + ignore: [/node_modules/], +}) diff --git a/packages/gatsby-worker/src/__tests__/integration.ts b/packages/gatsby-worker/src/__tests__/integration.ts new file mode 100644 index 0000000000000..4cdf990f96cbf --- /dev/null +++ b/packages/gatsby-worker/src/__tests__/integration.ts @@ -0,0 +1,285 @@ +import "jest-extended" +import { WorkerPool } from "../" +import { isPromise } from "../utils" + +describe(`gatsby-worker`, () => { + let workerPool: WorkerPool | undefined + const numWorkers = 2 + + async function endWorkerPool(): Promise { + if (workerPool) { + await Promise.all(workerPool.end()) + workerPool = undefined + } + } + + beforeEach(() => { + workerPool = new WorkerPool( + require.resolve(`./fixtures/test-child`), + { + numWorkers, + env: { + NODE_OPTIONS: `--require ${require.resolve( + `./fixtures/ts-register` + )}`, + }, + } + ) + }) + + afterEach(endWorkerPool) + + it(`discovers exported functions`, () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + const exposedMethodsSingle = Object.keys(workerPool.single) + const exposedMethodsAll = Object.keys(workerPool.all) + // we expect that `notAFunction` even tho is exported in child module is not exposed + // as it's not a function + expect(exposedMethodsSingle).toMatchInlineSnapshot(` + Array [ + "sync", + "async", + "neverEnding", + "syncThrow", + "asyncThrow", + "async100ms", + ] + `) + // .all and .single should have same methods + expect(exposedMethodsSingle).toEqual(exposedMethodsAll) + }) + + describe(`.single`, () => { + it(`exported sync function`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + const returnValue = workerPool.single.sync(`.single sync`) + // turns sync function into async + expect(isPromise(returnValue)).toEqual(true) + + const resolvedValue = await returnValue + expect(resolvedValue).toMatchInlineSnapshot(`"foo .single sync"`) + }) + + it(`exported async function`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + const returnValue = workerPool.single.async(`.single async`) + // promise is preserved + expect(isPromise(returnValue)).toEqual(true) + + const resolvedValue = await returnValue + expect(resolvedValue).toMatchInlineSnapshot(`"foo .single async"`) + }) + + it(`exported sync function that throws`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + expect.assertions(2) + + const returnValue = workerPool.single.syncThrow(`.single syncThrow`) + // turns sync function into async + expect(isPromise(returnValue)).toEqual(true) + + try { + await returnValue + fail(`promise should reject`) + } catch (e) { + expect(e).toMatchInlineSnapshot(`[Error: sync throw]`) + } + }) + + it(`exported async function that throws`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + expect.assertions(2) + + const returnValue = workerPool.single.asyncThrow(`.single asyncThrow`) + // promise is preserved + expect(isPromise(returnValue)).toEqual(true) + + try { + await returnValue + fail(`promise should reject`) + } catch (e) { + expect(e).toMatchInlineSnapshot(`[Error: async throw]`) + } + }) + }) + + describe(`.all`, () => { + it(`exported sync function`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + const returnValue = workerPool.all.sync(`.single sync`, { + addWorkerId: true, + }) + + expect(returnValue).toBeArrayOfSize(numWorkers) + // turns sync function into async + expect(returnValue).toSatisfyAll(isPromise) + + const resolvedValue = await Promise.all(returnValue) + expect(resolvedValue).toMatchInlineSnapshot(` + Array [ + "foo .single sync (worker #1)", + "foo .single sync (worker #2)", + ] + `) + }) + + it(`exported async function`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + const returnValue = workerPool.all.async(`.single async`, { + addWorkerId: true, + }) + + expect(returnValue).toBeArrayOfSize(numWorkers) + // promise is preserved + expect(returnValue).toSatisfyAll(isPromise) + + const resolvedValue = await Promise.all(returnValue) + expect(resolvedValue).toMatchInlineSnapshot(` + Array [ + "foo .single async (worker #1)", + "foo .single async (worker #2)", + ] + `) + }) + + it(`exported sync function that throws`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + expect.assertions(3) + + const returnValue = workerPool.all.syncThrow(`.single syncThrow`, { + addWorkerId: true, + throwOnWorker: 2, + }) + expect(returnValue).toBeArrayOfSize(numWorkers) + // turns sync function into async + expect(returnValue).toSatisfyAll(isPromise) + + try { + await Promise.all(returnValue) + fail(`promise should reject`) + } catch (e) { + expect(e).toMatchInlineSnapshot(`[Error: sync throw (worker #2)]`) + } + }) + + it(`exported async function that throws`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + expect.assertions(3) + + const returnValue = workerPool.all.asyncThrow(`.single asyncThrow`, { + addWorkerId: true, + throwOnWorker: 2, + }) + expect(returnValue).toBeArrayOfSize(numWorkers) + // promise is preserved + expect(returnValue).toSatisfyAll(isPromise) + + try { + await Promise.all(returnValue) + fail(`promise should reject`) + } catch (e) { + expect(e).toMatchInlineSnapshot(`[Error: async throw (worker #2)]`) + } + }) + }) + + describe(`.end`, () => { + it(`fails currently executed and pending tasks when worker is closed`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + expect.assertions(numWorkers * 2) + + // queueing 2 * numWorkers task, so that currently executed tasks reject + // as well as pending tasks + for (let i = 0; i < numWorkers * 2; i++) { + workerPool.single + .neverEnding() + .then(() => { + fail(`promise should reject`) + }) + .catch(e => { + expect(e).toMatchInlineSnapshot( + `[Error: Worker exited before finishing task]` + ) + }) + } + + await endWorkerPool() + }) + }) + + describe(`task queue`, () => { + it(`distributes task between workers when using .single`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + interface IReturnOfasync100ms { + taskId: number + workerId: string + } + + const promises: Array> = [] + const resultsOrder: Array = [] + const numTask = numWorkers * 10 + for (let taskId = 1; taskId <= numTask; taskId++) { + const promise = workerPool.single.async100ms(taskId, { + addWorkerId: true, + }) + promise.then(res => { + resultsOrder.push(res) + }) + promises.push(promise) + } + + await Promise.all(promises) + + expect(resultsOrder).toBeArrayOfSize(numTask) + + const executedOnWorker1 = resultsOrder.filter(res => res.workerId === `1`) + const executedOnWorker2 = resultsOrder.filter(res => res.workerId === `2`) + + // each worker executed some tasks - we can't ensure that task are evenly split + // so just making sure there are some tasks executed in each worker + expect(executedOnWorker1.length).toBeGreaterThan(0) + expect(executedOnWorker2.length).toBeGreaterThan(0) + + // results from each worker came in order of being called + expect(executedOnWorker1).toEqual( + executedOnWorker1.sort((a, b) => a.taskId - b.taskId) + ) + expect(executedOnWorker2).toEqual( + executedOnWorker2.sort((a, b) => a.taskId - b.taskId) + ) + }) + }) +}) diff --git a/packages/gatsby-worker/src/__tests__/task-queue.ts b/packages/gatsby-worker/src/__tests__/task-queue.ts new file mode 100644 index 0000000000000..66caa5048b1cd --- /dev/null +++ b/packages/gatsby-worker/src/__tests__/task-queue.ts @@ -0,0 +1,201 @@ +import { TaskQueue } from "../task-queue" + +function getValuesInQueue(queue: TaskQueue): Array { + const valuesInQueue: Array = [] + for (const item of queue) { + valuesInQueue.push(item.value) + } + + return valuesInQueue +} + +describe(`Task Queue`, () => { + it(`correctly holds all queued items in order`, () => { + const taskQueue = new TaskQueue() + + for (let i = 1; i <= 20; i++) { + taskQueue.enqueue(i) + } + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(` + Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + ] + `) + }) + + it(`handles removing first item`, () => { + const taskQueue = new TaskQueue() + + for (let i = 1; i <= 20; i++) { + taskQueue.enqueue(i) + } + + for (const item of taskQueue) { + if (item.value === 1) { + taskQueue.remove(item) + break + } + } + + taskQueue.enqueue(`added after removal`) + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(` + Array [ + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + "added after removal", + ] + `) + }) + + it(`handles removing last item`, () => { + const taskQueue = new TaskQueue() + + for (let i = 1; i <= 20; i++) { + taskQueue.enqueue(i) + } + + for (const item of taskQueue) { + if (item.value === 20) { + taskQueue.remove(item) + break + } + } + + taskQueue.enqueue(`added after removal`) + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(` + Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + "added after removal", + ] + `) + }) + + it(`handles removing item in the middle`, () => { + const taskQueue = new TaskQueue() + + for (let i = 1; i <= 20; i++) { + taskQueue.enqueue(i) + } + + for (const item of taskQueue) { + if (item.value === 11) { + taskQueue.remove(item) + break + } + } + + taskQueue.enqueue(`added after removal`) + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(` + Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + "added after removal", + ] + `) + }) + + it(`queue can fill up after being drained`, () => { + const taskQueue = new TaskQueue() + + taskQueue.enqueue(1) + taskQueue.enqueue(2) + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(` + Array [ + 1, + 2, + ] + `) + + for (const item of taskQueue) { + taskQueue.remove(item) + } + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(`Array []`) + + taskQueue.enqueue(3) + taskQueue.enqueue(4) + + expect(getValuesInQueue(taskQueue)).toMatchInlineSnapshot(` + Array [ + 3, + 4, + ] + `) + }) +}) diff --git a/packages/gatsby-worker/src/child.ts b/packages/gatsby-worker/src/child.ts new file mode 100644 index 0000000000000..9c96170c8e1da --- /dev/null +++ b/packages/gatsby-worker/src/child.ts @@ -0,0 +1,66 @@ +import { + ParentMessageUnion, + ChildMessageUnion, + EXECUTE, + END, + ERROR, + RESULT, +} from "./types" +import { isPromise } from "./utils" + +let ensuredSendToMain: (msg: ChildMessageUnion) => void + +if (!process.send) { + throw new Error(`I was told there would be parentPort :shrug:`) +} else { + ensuredSendToMain = process.send.bind(process) +} + +if (!process.env.GATSBY_WORKER_MODULE_PATH) { + throw new Error(`I was told there would be worker module path :shrug:`) +} + +const child = require(process.env.GATSBY_WORKER_MODULE_PATH) + +function onError(error: Error): void { + if (error == null) { + error = new Error(`"null" or "undefined" thrown`) + } + + const msg: ChildMessageUnion = [ + ERROR, + error.constructor && error.constructor.name, + error.message, + error.stack, + error, + ] + + ensuredSendToMain(msg) +} + +function onResult(result: unknown): void { + const msg: ChildMessageUnion = [RESULT, result] + ensuredSendToMain(msg) +} + +function messageHandler(msg: ParentMessageUnion): void { + if (msg[0] === EXECUTE) { + let result + try { + result = child[msg[1]].call(child, ...msg[2]) + } catch (e) { + onError(e) + return + } + + if (isPromise(result)) { + result.then(onResult, onError) + } else { + onResult(result) + } + } else if (msg[0] === END) { + process.off(`message`, messageHandler) + } +} + +process.on(`message`, messageHandler) diff --git a/packages/gatsby-worker/src/index.ts b/packages/gatsby-worker/src/index.ts new file mode 100644 index 0000000000000..b24c95622ec72 --- /dev/null +++ b/packages/gatsby-worker/src/index.ts @@ -0,0 +1,322 @@ +import { fork, ChildProcess } from "child_process" + +import { TaskQueue } from "./task-queue" +import { + EXECUTE, + END, + ERROR, + RESULT, + ParentMessageUnion, + ChildMessageUnion, +} from "./types" + +interface IWorkerOptions { + numWorkers?: number + env?: Record +} + +type WrapReturnOfAFunctionInAPromise< + FunctionThatDoesNotReturnAPromise extends (...args: Array) => any +> = ( + ...a: Parameters +) => Promise> + +// gatsby-worker will make sync function async, so to keep proper types we need to adjust types so all functions +// on worker pool are async +type EnsureFunctionReturnsAPromise = MaybeFunction extends ( + ...args: Array +) => Promise + ? MaybeFunction + : MaybeFunction extends (...args: Array) => any + ? WrapReturnOfAFunctionInAPromise + : never + +type WrapReturnInArray = MaybeFunction extends ( + ...args: Array +) => any + ? (...a: Parameters) => Array> + : never + +export type CreateWorkerPoolType = WorkerPool & + { + [FunctionName in keyof ExposedFunctions]: EnsureFunctionReturnsAPromise< + ExposedFunctions[FunctionName] + > + } & { + all: { + [FunctionName in keyof ExposedFunctions]: WrapReturnInArray< + EnsureFunctionReturnsAPromise + > + } + } + +const childWrapperPath = require.resolve(`./child`) + +class TaskInfo { + functionName: T + args: Array + assignedToWorker?: IWorkerInfo + promise: Promise + resolve!: (r: any) => void + reject!: (e: Error) => void + + constructor(opts: { + functionName: T + args: Array + assignedToWorker?: IWorkerInfo + }) { + this.functionName = opts.functionName + this.args = opts.args + this.assignedToWorker = opts.assignedToWorker + this.promise = new Promise((resolve, reject) => { + this.resolve = resolve + this.reject = reject + }) + } +} + +interface IWorkerInfo { + workerId: number + worker: ChildProcess + exitedPromise: Promise<{ + code: number | null + signal: NodeJS.Signals | null + }> + currentTask?: TaskInfo +} + +export class WorkerPool> { + /** + * Schedule task execution on all workers. Useful for setting up workers + */ + all: { + [FunctionName in keyof WorkerModuleExports]: WrapReturnInArray< + EnsureFunctionReturnsAPromise + > + } + + /** + * Schedule task execution on single worker. Useful to distribute tasks between multiple workers. + */ + single: { + [FunctionName in keyof WorkerModuleExports]: EnsureFunctionReturnsAPromise< + WorkerModuleExports[FunctionName] + > + } + + private workers: Array> = [] + private taskQueue = new TaskQueue>() + private idleWorkers: Set> = new Set() + + constructor(workerPath: string, options?: IWorkerOptions) { + const single: Partial["single"]> = {} + const all: Partial["all"]> = {} + + { + // we don't need to retain these + const module: WorkerModuleExports = require(workerPath) + const exportNames = Object.keys(module) as Array< + keyof WorkerModuleExports + > + + for (const exportName of exportNames) { + if (typeof module[exportName] !== `function`) { + // we only expose functions + continue + } + + single[exportName] = this.scheduleWorkSingle.bind( + this, + exportName + ) as WorkerPool["single"][typeof exportName] + + all[exportName] = (this.scheduleWorkAll.bind( + this, + exportName + ) as unknown) as WorkerPool< + WorkerModuleExports + >["all"][typeof exportName] + } + } + + this.single = single as WorkerPool["single"] + this.all = all as WorkerPool["all"] + + for (let workerId = 1; workerId <= (options?.numWorkers ?? 1); workerId++) { + const worker = fork(childWrapperPath, { + cwd: process.cwd(), + env: { + ...process.env, + ...(options?.env ?? {}), + GATSBY_WORKER_ID: workerId.toString(), + GATSBY_WORKER_MODULE_PATH: workerPath, + }, + }) + + const workerInfo: IWorkerInfo = { + workerId, + worker, + exitedPromise: new Promise(resolve => { + worker.on(`exit`, (code, signal) => { + if (workerInfo.currentTask) { + // worker exited without finishing a task + workerInfo.currentTask.reject( + new Error(`Worker exited before finishing task`) + ) + } + // remove worker from list of workers + this.workers.splice(this.workers.indexOf(workerInfo), 1) + // console.log(`after exit`, this.workers) + resolve({ code, signal }) + }) + }), + } + + worker.on(`message`, (msg: ChildMessageUnion) => { + if (!workerInfo.currentTask) { + throw new Error(`worker finished work but no idea what work :shrug:`) + } + + if (msg[0] === RESULT) { + const task = workerInfo.currentTask + workerInfo.currentTask = undefined + this.checkForWork(workerInfo) + task.resolve(msg[1]) + } else if (msg[0] === ERROR) { + let error = msg[4] + + if (error !== null && typeof error === `object`) { + const extra = error + + const NativeCtor = global[msg[1]] + const Ctor = typeof NativeCtor === `function` ? NativeCtor : Error + + error = new Ctor(msg[2]) + // @ts-ignore type doesn't exist on Error, but that's what jest-worker does for errors :shrug: + error.type = msg[1] + error.stack = msg[3] + + for (const key in extra) { + if (Object.prototype.hasOwnProperty.call(extra, key)) { + error[key] = extra[key] + } + } + } + + const task = workerInfo.currentTask + workerInfo.currentTask = undefined + this.checkForWork(workerInfo) + task.reject(error) + } + }) + + this.workers.push(workerInfo) + this.idleWorkers.add(workerInfo) + } + } + + end(): Array> { + const results = this.workers.map(async workerInfo => { + // tell worker to end gracefully + const endMessage: ParentMessageUnion = [END] + + workerInfo.worker.send(endMessage) + + // force exit if worker doesn't exit gracefully quickly + const forceExitTimeout = setTimeout(() => { + workerInfo.worker.kill(`SIGKILL`) + }, 1000) + + const exitResult = await workerInfo.exitedPromise + + clearTimeout(forceExitTimeout) + + return exitResult.code + }) + + Promise.all(results).then(() => { + // make sure we fail queued tasks as well + for (const taskNode of this.taskQueue) { + taskNode.value.reject(new Error(`Worker exited before finishing task`)) + } + }) + + return results + } + + private checkForWork( + workerInfo: IWorkerInfo + ): void { + // check if there is task in queue + for (const taskNode of this.taskQueue) { + const task = taskNode.value + if (!task.assignedToWorker || task.assignedToWorker === workerInfo) { + this.doWork(task, workerInfo) + this.taskQueue.remove(taskNode) + + return + } + } + + // no task found, so just marking worker as idle + this.idleWorkers.add(workerInfo) + } + + private doWork( + taskInfo: TaskInfo, + workerInfo: IWorkerInfo + ): void { + // block worker + workerInfo.currentTask = taskInfo + this.idleWorkers.delete(workerInfo) + + const msg: ParentMessageUnion = [ + EXECUTE, + taskInfo.functionName, + taskInfo.args, + ] + workerInfo.worker.send(msg) + } + + private scheduleWork( + taskInfo: TaskInfo + ): Promise { + let workerToExecuteTaskNow: + | IWorkerInfo + | undefined + + if (taskInfo.assignedToWorker) { + if (this.idleWorkers.has(taskInfo.assignedToWorker)) { + workerToExecuteTaskNow = taskInfo.assignedToWorker + } + } else { + workerToExecuteTaskNow = this.idleWorkers.values().next().value + } + + if (workerToExecuteTaskNow) { + this.doWork(taskInfo, workerToExecuteTaskNow) + } else { + this.taskQueue.enqueue(taskInfo) + } + + return taskInfo.promise + } + + private scheduleWorkSingle( + functionName: T, + ...args: Array + ): Promise { + return this.scheduleWork(new TaskInfo({ functionName, args })) + } + + private scheduleWorkAll( + functionName: T, + ...args: Array + ): Array> { + return this.workers.map(workerInfo => + this.scheduleWork( + new TaskInfo({ assignedToWorker: workerInfo, functionName, args }) + ) + ) + } +} diff --git a/packages/gatsby-worker/src/task-queue.ts b/packages/gatsby-worker/src/task-queue.ts new file mode 100644 index 0000000000000..ee390148484c8 --- /dev/null +++ b/packages/gatsby-worker/src/task-queue.ts @@ -0,0 +1,49 @@ +interface ITaskQueueNode { + value: ValueType + next?: ITaskQueueNode + prev?: ITaskQueueNode +} +export class TaskQueue { + private head?: ITaskQueueNode + private tail?: ITaskQueueNode; + + *[Symbol.iterator](): Iterator> { + let currentHead = this.head + while (currentHead) { + yield currentHead + currentHead = currentHead.next + } + } + + enqueue(task: ValueType): void { + const newNode: ITaskQueueNode = { + value: task, + } + if (this.tail) { + this.tail.next = newNode + newNode.prev = this.tail + } else { + this.head = newNode + } + + this.tail = newNode + } + + remove(taskNode: ITaskQueueNode): void { + if (taskNode === this.head) { + this.head = taskNode.next + if (this.head) { + this.head.prev = undefined + } else { + // if we don't have head, we also don't have tail + this.tail = undefined + } + } else { + if (taskNode === this.tail) { + this.tail = taskNode.prev + } + // if node is not the head then it will have .prev + taskNode.prev!.next = taskNode.next + } + } +} diff --git a/packages/gatsby-worker/src/types.ts b/packages/gatsby-worker/src/types.ts new file mode 100644 index 0000000000000..9e74cc66f5638 --- /dev/null +++ b/packages/gatsby-worker/src/types.ts @@ -0,0 +1,30 @@ +export const EXECUTE = 0b01 +export const ERROR = 0b10 +export const RESULT = 0b11 +export const END = 0b00 + +type FunctionName = string | number | symbol +type FunctionArgs = Array + +type ExecuteMessage = [typeof EXECUTE, FunctionName, FunctionArgs] +type EndMessage = [typeof END] + +export type ParentMessageUnion = ExecuteMessage | EndMessage + +type ErrorType = string +type ErrorMessage = string +type ErrorStack = string + +type TaskError = [ + typeof ERROR, + ErrorType, + ErrorMessage, + ErrorStack | undefined, + Error +] + +type ResultType = unknown + +type TaskResult = [typeof RESULT, ResultType] + +export type ChildMessageUnion = TaskError | TaskResult diff --git a/packages/gatsby-worker/src/utils.ts b/packages/gatsby-worker/src/utils.ts new file mode 100644 index 0000000000000..ed97194b4d713 --- /dev/null +++ b/packages/gatsby-worker/src/utils.ts @@ -0,0 +1,4 @@ +export const isPromise = (obj: any): obj is PromiseLike => + !!obj && + (typeof obj === `object` || typeof obj === `function`) && + typeof obj.then === `function` diff --git a/packages/gatsby-worker/tsconfig.json b/packages/gatsby-worker/tsconfig.json new file mode 100644 index 0000000000000..c565028b9760a --- /dev/null +++ b/packages/gatsby-worker/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.json", + "include": ["src"], + "compilerOptions": { + "skipLibCheck": true + }, + "exclude": ["**/__tests__/**/*"] +} diff --git a/renovate.json5 b/renovate.json5 index 0cf6bd41aecd5..1c3d65808c9b5 100644 --- a/renovate.json5 +++ b/renovate.json5 @@ -22700,6 +22700,208 @@ "commitMessageSuffix": "{{#unless groupName}} for gatsby-transformer-yaml{{/unless}}", "dependencyDashboardApproval": true }, + { + "matchPaths": [ + "packages/gatsby-worker/package.json" + ], + "matchDepTypes": [ + "devDependencies" + ], + "matchUpdateTypes": [ + "patch", + "minor" + ], + "groupName": "[DEV] minor and patch dependencies for gatsby-worker", + "groupSlug": "gatsby-worker-dev-minor", + "automerge": true, + "excludePackageNames": [ + "eslint", + "prettier", + "cross-env", + "execa", + "mini-css-extract-plugin", + "sharp", + "@types/sharp", + "typescript", + "chalk", + "fs-extra", + "@types/fs-extra" + ], + "excludePackagePatterns": [ + "^eslint-", + "^@typescript-eslint/", + "^@testing-library/" + ], + "commitMessageSuffix": "{{#unless groupName}} for gatsby-worker{{/unless}}" + }, + { + "matchPaths": [ + "packages/gatsby-worker/package.json" + ], + "matchDepTypes": [ + "devDependencies" + ], + "matchUpdateTypes": [ + "major" + ], + "groupName": "[DEV] major dependencies for gatsby-worker", + "groupSlug": "gatsby-worker-dev-major", + "automerge": true, + "dependencyDashboardApproval": false, + "excludePackageNames": [ + "eslint", + "prettier", + "cross-env", + "execa", + "mini-css-extract-plugin", + "sharp", + "@types/sharp", + "typescript", + "chalk", + "fs-extra", + "@types/fs-extra" + ], + "excludePackagePatterns": [ + "^eslint-", + "^@typescript-eslint/", + "^@testing-library/" + ], + "commitMessageSuffix": "{{#unless groupName}} for gatsby-worker{{/unless}}" + }, + { + "matchPaths": [ + "packages/gatsby-worker/package.json" + ], + "matchDepTypes": [ + "dependencies" + ], + "matchUpdateTypes": [ + "patch", + "minor" + ], + "groupName": "minor and patch dependencies for gatsby-worker", + "groupSlug": "gatsby-worker-prod-minor", + "excludePackageNames": [ + "eslint", + "prettier", + "cross-env", + "execa", + "mini-css-extract-plugin", + "sharp", + "@types/sharp", + "typescript", + "chalk", + "fs-extra", + "@types/fs-extra" + ], + "excludePackagePatterns": [ + "^eslint-", + "^@typescript-eslint/", + "^@testing-library/" + ], + "commitMessageSuffix": "{{#unless groupName}} for gatsby-worker{{/unless}}" + }, + { + "matchPaths": [ + "packages/gatsby-worker/package.json" + ], + "matchDepTypes": [ + "dependencies" + ], + "matchUpdateTypes": [ + "major" + ], + "groupName": "major dependencies for gatsby-worker", + "groupSlug": "gatsby-worker-prod-major", + "excludePackageNames": [ + "eslint", + "prettier", + "cross-env", + "execa", + "mini-css-extract-plugin", + "sharp", + "@types/sharp", + "typescript", + "chalk", + "fs-extra", + "@types/fs-extra" + ], + "excludePackagePatterns": [ + "^eslint-", + "^@typescript-eslint/", + "^@testing-library/" + ], + "commitMessageSuffix": "{{#unless groupName}} for gatsby-worker{{/unless}}", + "dependencyDashboardApproval": true + }, + { + "matchPaths": [ + "packages/gatsby-worker/package.json" + ], + "matchDepTypes": [ + "dependencies" + ], + "groupName": "minor and patch dependencies for gatsby-worker", + "groupSlug": "gatsby-worker-prod-minor", + "matchPackageNames": [], + "matchUpdateTypes": [ + "patch" + ], + "excludePackageNames": [ + "eslint", + "prettier", + "cross-env", + "execa", + "mini-css-extract-plugin", + "sharp", + "@types/sharp", + "typescript", + "chalk", + "fs-extra", + "@types/fs-extra" + ], + "excludePackagePatterns": [ + "^eslint-", + "^@typescript-eslint/", + "^@testing-library/" + ], + "commitMessageSuffix": "{{#unless groupName}} for gatsby-worker{{/unless}}" + }, + { + "matchPaths": [ + "packages/gatsby-worker/package.json" + ], + "matchDepTypes": [ + "dependencies" + ], + "groupName": "major dependencies for gatsby-worker", + "groupSlug": "gatsby-worker-prod-major", + "matchPackageNames": [], + "matchUpdateTypes": [ + "major", + "minor" + ], + "excludePackageNames": [ + "eslint", + "prettier", + "cross-env", + "execa", + "mini-css-extract-plugin", + "sharp", + "@types/sharp", + "typescript", + "chalk", + "fs-extra", + "@types/fs-extra" + ], + "excludePackagePatterns": [ + "^eslint-", + "^@typescript-eslint/", + "^@testing-library/" + ], + "commitMessageSuffix": "{{#unless groupName}} for gatsby-worker{{/unless}}", + "dependencyDashboardApproval": true + }, { "matchPaths": [ "packages/gatsby/package.json" From a1c78112f179c14db78c863c7950885bfa7c8272 Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Fri, 25 Jun 2021 14:02:58 +0200 Subject: [PATCH 02/13] chore(gatsby): convert ssr and PQR workerpool to gatsby-worker --- packages/gatsby/package.json | 1 + packages/gatsby/src/commands/build-html.ts | 4 +- .../src/utils/worker/__tests__/config.ts | 10 +++-- .../src/utils/worker/__tests__/datastore.ts | 6 ++- .../src/utils/worker/__tests__/queries.ts | 16 ++++---- .../src/utils/worker/__tests__/reporter.ts | 2 +- .../src/utils/worker/__tests__/schema.ts | 10 ++--- .../src/utils/worker/__tests__/share-state.ts | 14 ++++--- .../test-helpers/create-test-worker.ts | 41 ++++++------------- .../__tests__/test-helpers/ts-register.js | 6 +++ .../test-helpers/wrapper-for-tests.js | 13 ------ packages/gatsby/src/utils/worker/pool.ts | 27 ++++++------ packages/gatsby/src/utils/worker/types.ts | 24 ----------- 13 files changed, 69 insertions(+), 105 deletions(-) create mode 100644 packages/gatsby/src/utils/worker/__tests__/test-helpers/ts-register.js delete mode 100644 packages/gatsby/src/utils/worker/__tests__/test-helpers/wrapper-for-tests.js delete mode 100644 packages/gatsby/src/utils/worker/types.ts diff --git a/packages/gatsby/package.json b/packages/gatsby/package.json index beb18bd3955c7..91aa175f4303d 100644 --- a/packages/gatsby/package.json +++ b/packages/gatsby/package.json @@ -85,6 +85,7 @@ "gatsby-plugin-utils": "^1.9.0-next.1", "gatsby-react-router-scroll": "^4.9.0-next.0", "gatsby-telemetry": "^2.9.0-next.1", + "gatsby-worker": "0.0.0-next.0", "glob": "^7.1.6", "got": "8.3.2", "graphql": "^15.4.0", diff --git a/packages/gatsby/src/commands/build-html.ts b/packages/gatsby/src/commands/build-html.ts index 23eee1b430b2b..d69852f170e99 100644 --- a/packages/gatsby/src/commands/build-html.ts +++ b/packages/gatsby/src/commands/build-html.ts @@ -213,8 +213,8 @@ const renderHTMLQueue = async ( const renderHTML = stage === `build-html` - ? workerPool.renderHTMLProd - : workerPool.renderHTMLDev + ? workerPool.single.renderHTMLProd + : workerPool.single.renderHTMLDev const uniqueUnsafeBuiltinUsedStacks = new Set() diff --git a/packages/gatsby/src/utils/worker/__tests__/config.ts b/packages/gatsby/src/utils/worker/__tests__/config.ts index c21de65b8af35..e2b601481f7cc 100644 --- a/packages/gatsby/src/utils/worker/__tests__/config.ts +++ b/packages/gatsby/src/utils/worker/__tests__/config.ts @@ -21,11 +21,15 @@ it(`can load config and execute node API in worker`, async () => { const siteDirectory = path.join(__dirname, `fixtures`, `sample-site`) // plugin options for custom local plugin contains function (() => `foo`) - await worker.loadConfigAndPlugins({ siteDirectory }) + await Promise.all( + worker.all.loadConfigAndPlugins({ + siteDirectory, + }) + ) // plugin API execute function from plugin options and store result in `global` - await worker.runAPI(`createSchemaCustomization`) + await Promise.all(worker.all.runAPI(`createSchemaCustomization`)) // getting result stored in `global` - expect(await worker.getAPIRunResult()).toEqual(`foo`) + expect(await worker.single.getAPIRunResult()).toEqual(`foo`) }) diff --git a/packages/gatsby/src/utils/worker/__tests__/datastore.ts b/packages/gatsby/src/utils/worker/__tests__/datastore.ts index a1b168edc50e5..246733e376a89 100644 --- a/packages/gatsby/src/utils/worker/__tests__/datastore.ts +++ b/packages/gatsby/src/utils/worker/__tests__/datastore.ts @@ -26,7 +26,7 @@ itWhenLMDB(`worker can access node created in main process`, async () => { const testNodeId = `shared-node` expect(getDataStore().getNode(testNodeId)).toBeFalsy() - expect(await worker.getNodeFromWorker(testNodeId)).toBeFalsy() + expect(await worker.single.getNodeFromWorker(testNodeId)).toBeFalsy() const node = { id: testNodeId, @@ -39,7 +39,9 @@ itWhenLMDB(`worker can access node created in main process`, async () => { await getDataStore().ready() const nodeStoredInMainProcess = getDataStore().getNode(testNodeId) - const nodeStoredInWorkerProcess = await worker.getNodeFromWorker(testNodeId) + const nodeStoredInWorkerProcess = await worker.single.getNodeFromWorker( + testNodeId + ) expect(nodeStoredInWorkerProcess).toMatchInlineSnapshot(` Object { diff --git a/packages/gatsby/src/utils/worker/__tests__/queries.ts b/packages/gatsby/src/utils/worker/__tests__/queries.ts index 2ee929e797b71..d2455e50153b2 100644 --- a/packages/gatsby/src/utils/worker/__tests__/queries.ts +++ b/packages/gatsby/src/utils/worker/__tests__/queries.ts @@ -111,7 +111,7 @@ describeWhenLMDB(`worker (queries)`, () => { const siteDirectory = path.join(__dirname, `fixtures`, `sample-site`) await loadConfigAndPlugins({ siteDirectory }) - await worker.loadConfigAndPlugins({ siteDirectory }) + await Promise.all(worker.all.loadConfigAndPlugins({ siteDirectory })) await sourceNodesAndRemoveStaleNodes({ webhookBody: {} }) await getDataStore().ready() @@ -162,8 +162,8 @@ describeWhenLMDB(`worker (queries)`, () => { saveStateForWorkers([`components`, `staticQueryComponents`]) - await worker.buildSchema() - await worker.runQueries(queryIdsSmall) + await Promise.all(worker.all.buildSchema()) + await worker.single.runQueries(queryIdsSmall) }) afterAll(() => { @@ -178,7 +178,7 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should execute static queries`, async () => { if (!worker) fail(`worker not defined`) - const stateFromWorker = await worker.getState() + const stateFromWorker = await worker.single.getState() const staticQueryResult = await fs.readJson( `${stateFromWorker.program.directory}/public/page-data/sq/d/${dummyStaticQuery.hash}.json` @@ -195,7 +195,7 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should execute page queries`, async () => { if (!worker) fail(`worker not defined`) - const stateFromWorker = await worker.getState() + const stateFromWorker = await worker.single.getState() const pageQueryResult = await fs.readJson( `${stateFromWorker.program.directory}/.cache/json/_foo.json` @@ -210,7 +210,7 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should execute page queries with context variables`, async () => { if (!worker) fail(`worker not defined`) - const stateFromWorker = await worker.getState() + const stateFromWorker = await worker.single.getState() const pageQueryResult = await fs.readJson( `${stateFromWorker.program.directory}/.cache/json/_bar.json` @@ -227,11 +227,11 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should chunk work in runQueriesInWorkersQueue`, async () => { if (!worker) fail(`worker not defined`) - const spy = jest.spyOn(worker, `runQueries`) + const spy = jest.spyOn(worker.single, `runQueries`) // @ts-ignore - worker is defined await runQueriesInWorkersQueue(worker, queryIdsBig, 10) - const stateFromWorker = await worker.getState() + const stateFromWorker = await worker.single.getState() // Called the complete ABC so we can test _a const pageQueryResultA = await fs.readJson( diff --git a/packages/gatsby/src/utils/worker/__tests__/reporter.ts b/packages/gatsby/src/utils/worker/__tests__/reporter.ts index 1cd54bd4fe085..d13c542191f50 100644 --- a/packages/gatsby/src/utils/worker/__tests__/reporter.ts +++ b/packages/gatsby/src/utils/worker/__tests__/reporter.ts @@ -14,7 +14,7 @@ it(`worker can use reporter without crashing`, async () => { worker = createTestWorker() try { - const result = await worker.log(`log`) + const result = await worker.single.log(`log`) expect(result).toEqual(true) } catch (e) { expect(e).toBeFalsy() diff --git a/packages/gatsby/src/utils/worker/__tests__/schema.ts b/packages/gatsby/src/utils/worker/__tests__/schema.ts index 8d3c6c796c7dd..01392a2acb97d 100644 --- a/packages/gatsby/src/utils/worker/__tests__/schema.ts +++ b/packages/gatsby/src/utils/worker/__tests__/schema.ts @@ -49,15 +49,15 @@ describeWhenLMDB(`worker (schema)`, () => { const siteDirectory = path.join(__dirname, `fixtures`, `sample-site`) await loadConfigAndPlugins({ siteDirectory }) - await worker.loadConfigAndPlugins({ siteDirectory }) + await Promise.all(worker.all.loadConfigAndPlugins({ siteDirectory })) await sourceNodesAndRemoveStaleNodes({ webhookBody: {} }) await getDataStore().ready() await build({ parentSpan: {} }) saveStateForWorkers([`inferenceMetadata`]) - await worker.buildSchema() + await Promise.all(worker.all.buildSchema()) - stateFromWorker = await worker.getState() + stateFromWorker = await worker.single.getState() }) afterAll(() => { @@ -130,7 +130,7 @@ describeWhenLMDB(`worker (schema)`, () => { it(`should have resolverField from createResolvers`, async () => { // @ts-ignore - it exists - const { data } = await worker?.getRunQueryResult(` + const { data } = await worker?.single.getRunQueryResult(` { one: nodeTypeOne { number @@ -151,7 +151,7 @@ describeWhenLMDB(`worker (schema)`, () => { it(`should have fieldsOnGraphQL from setFieldsOnGraphQLNodeType`, async () => { // @ts-ignore - it exists - const { data } = await worker?.getRunQueryResult(` + const { data } = await worker?.single.getRunQueryResult(` { four: nodeTypeOne { fieldsOnGraphQL diff --git a/packages/gatsby/src/utils/worker/__tests__/share-state.ts b/packages/gatsby/src/utils/worker/__tests__/share-state.ts index 54fb6b38ffa71..42dc28c4bbe82 100644 --- a/packages/gatsby/src/utils/worker/__tests__/share-state.ts +++ b/packages/gatsby/src/utils/worker/__tests__/share-state.ts @@ -48,7 +48,7 @@ describe(`worker (share-state)`, () => { worker = createTestWorker() - const result = await worker.getPage(dummyPagePayload.path) + const result = await worker.single.getPage(dummyPagePayload.path) expect(result).toBe(null) }) @@ -208,10 +208,12 @@ describe(`worker (share-state)`, () => { saveStateForWorkers([`components`, `staticQueryComponents`]) - await worker.setQueries() + await Promise.all(worker.all.setQueries()) - const components = await worker.getComponent(dummyPagePayload.component) - const staticQueryComponents = await worker.getStaticQueryComponent( + const components = await worker.single.getComponent( + dummyPagePayload.component + ) + const staticQueryComponents = await worker.single.getStaticQueryComponent( staticQueryID ) @@ -256,9 +258,9 @@ describe(`worker (share-state)`, () => { saveStateForWorkers([`inferenceMetadata`]) - await worker.setInferenceMetadata() + await Promise.all(worker.all.setInferenceMetadata()) - const inf = await worker.getInferenceMetadata(`Test`) + const inf = await worker.single.getInferenceMetadata(`Test`) expect(inf).toMatchInlineSnapshot(` Object { diff --git a/packages/gatsby/src/utils/worker/__tests__/test-helpers/create-test-worker.ts b/packages/gatsby/src/utils/worker/__tests__/test-helpers/create-test-worker.ts index 867b63ef9ba47..9da78204bedd2 100644 --- a/packages/gatsby/src/utils/worker/__tests__/test-helpers/create-test-worker.ts +++ b/packages/gatsby/src/utils/worker/__tests__/test-helpers/create-test-worker.ts @@ -1,36 +1,21 @@ -import Worker from "jest-worker" -import type { CreateWorkerPoolType } from "../../types" +import { WorkerPool } from "gatsby-worker" -export type GatsbyTestWorkerPool = CreateWorkerPoolType< +export type GatsbyTestWorkerPool = WorkerPool< typeof import("./child-for-tests") > export function createTestWorker(): GatsbyTestWorkerPool { - // all child processes of this worker pool would have JEST_WORKER_ID set to 1 - // but running jest tests would create processes with possibly other IDs - // this will let child processes use same database ID as parent process (one that executes test) - process.env.FORCE_TEST_DATABASE_ID = process.env.JEST_WORKER_ID - process.env.GATSBY_WORKER_POOL_WORKER = `true` - - // deleting process.env.JEST_WORKER_ID is a workaround for `@babel/register` used in `./wrapper-for-tests` - // creating new Worker without explicitly defining `exposedMethods` result in requiring the module - // we need a way to recognize when require module is executed in context of process running test - // versus process(es) spawned for worker. jest-worker actually doesn't have a way to pass custom env vars - // to worker process without overriding JEST_WORKER_ID (yikes), but it does reuse current process.env - // so we temporarily unset JEST_WORKER_ID, so that module loaded in the same process as this code is executed - // doesn't use `@babel/register` and we rely on `jest-worker` auto-assigning JEST_WORKER_ID for child processes - const tmpJestWorkerId = process.env.JEST_WORKER_ID - delete process.env.JEST_WORKER_ID - - const worker = new Worker(require.resolve(`./wrapper-for-tests`), { - numWorkers: 1, - forkOptions: { - silent: false, - }, - maxRetries: 1, - }) as GatsbyTestWorkerPool - delete process.env.GATSBY_WORKER_POOL_WORKER - process.env.JEST_WORKER_ID = tmpJestWorkerId + const worker = new WorkerPool( + require.resolve(`./child-for-tests`), + { + numWorkers: 1, + env: { + FORCE_TEST_DATABASE_ID: process.env.JEST_WORKER_ID, + GATSBY_WORKER_POOL_WORKER: `true`, + NODE_OPTIONS: `--require ${require.resolve(`./ts-register`)}`, + }, + } + ) as GatsbyTestWorkerPool return worker } diff --git a/packages/gatsby/src/utils/worker/__tests__/test-helpers/ts-register.js b/packages/gatsby/src/utils/worker/__tests__/test-helpers/ts-register.js new file mode 100644 index 0000000000000..26ff716b30bc2 --- /dev/null +++ b/packages/gatsby/src/utils/worker/__tests__/test-helpers/ts-register.js @@ -0,0 +1,6 @@ +// spawned process won't use jest config to support TS, so we need to add support ourselves +require(`@babel/register`)({ + extensions: [`.js`, `.ts`], + configFile: require.resolve(`../../../../../babel.config.js`), + ignore: [/node_modules/], +}) diff --git a/packages/gatsby/src/utils/worker/__tests__/test-helpers/wrapper-for-tests.js b/packages/gatsby/src/utils/worker/__tests__/test-helpers/wrapper-for-tests.js deleted file mode 100644 index 5dd264f17609b..0000000000000 --- a/packages/gatsby/src/utils/worker/__tests__/test-helpers/wrapper-for-tests.js +++ /dev/null @@ -1,13 +0,0 @@ -// this IS executed in process that spawn worker to analyze exports and create functions for them on worker pool object -// so we want to use `@babel/register` ONLY inside actual worker as otherwise it does mess with jest transformation and tools like `rewire` -// see `./create-test-worker.ts` file for explanation why this env var is used. -if (process.env.JEST_WORKER_ID) { - // spawned process won't use jest config to support TS, so we need to add support ourselves - require(`@babel/register`)({ - extensions: [`.js`, `.ts`], - configFile: require.resolve(`../../../../../babel.config.js`), - ignore: [/node_modules/], - }) -} - -module.exports = require(`./child-for-tests`) diff --git a/packages/gatsby/src/utils/worker/pool.ts b/packages/gatsby/src/utils/worker/pool.ts index 82823b279d092..de9cb6c03abd7 100644 --- a/packages/gatsby/src/utils/worker/pool.ts +++ b/packages/gatsby/src/utils/worker/pool.ts @@ -1,22 +1,23 @@ -import Worker from "jest-worker" +import { WorkerPool } from "gatsby-worker" import { chunk } from "lodash" import reporter from "gatsby-cli/lib/reporter" import { cpuCoreCount } from "gatsby-core-utils" -import type { CreateWorkerPoolType } from "./types" import { IGroupedQueryIds } from "../../services" -export type GatsbyWorkerPool = CreateWorkerPoolType +export type GatsbyWorkerPool = WorkerPool export const create = (): GatsbyWorkerPool => { - process.env.GATSBY_WORKER_POOL_WORKER = `true` - const worker = new Worker(require.resolve(`./child`), { - numWorkers: Math.max(1, cpuCoreCount() - 1), - forkOptions: { - silent: false, - }, - }) as GatsbyWorkerPool - delete process.env.GATSBY_WORKER_POOL_WORKER + const worker = new WorkerPool( + require.resolve(`./child`), + { + numWorkers: Math.max(1, cpuCoreCount() - 1), + env: { + GATSBY_WORKER_POOL_WORKER: `true`, + }, + } + ) + return worker } @@ -38,7 +39,7 @@ export async function runQueriesInWorkersQueue( for (const segment of staticQuerySegments) { promises.push( - pool + pool.single .runQueries({ pageQueryIds: [], staticQueryIds: segment }) .then(() => { activity.tick(segment.length) @@ -48,7 +49,7 @@ export async function runQueriesInWorkersQueue( for (const segment of pageQuerySegments) { promises.push( - pool + pool.single .runQueries({ pageQueryIds: segment, staticQueryIds: [] }) .then(() => { activity.tick(segment.length) diff --git a/packages/gatsby/src/utils/worker/types.ts b/packages/gatsby/src/utils/worker/types.ts deleted file mode 100644 index 29f1827eea9b1..0000000000000 --- a/packages/gatsby/src/utils/worker/types.ts +++ /dev/null @@ -1,24 +0,0 @@ -import type Worker from "jest-worker" - -type WrapReturnOfAFunctionInAPromise< - FunctionThatDoesNotReturnAPromise extends (...args: Array) => any -> = ( - ...a: Parameters -) => Promise> - -// jest-worker will make sync function async, so to keep proper types we need to adjust types so all functions -// on worker pool are async -type EnsureFunctionReturnsAPromise = MaybeFunction extends ( - ...args: Array -) => Promise - ? MaybeFunction - : MaybeFunction extends (...args: Array) => any - ? WrapReturnOfAFunctionInAPromise - : never - -export type CreateWorkerPoolType = Worker & - { - [FunctionName in keyof ExposedFunctions]: EnsureFunctionReturnsAPromise< - ExposedFunctions[FunctionName] - > - } From 605b723f66f029d6da76b200e1f3cd695d59ef00 Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Fri, 25 Jun 2021 14:26:58 +0200 Subject: [PATCH 03/13] chore(gatsby): convert dev-ssr to gatsby-worker --- packages/gatsby/package.json | 1 - .../utils/dev-ssr/render-dev-html-child.js | 2 - .../src/utils/dev-ssr/render-dev-html.ts | 62 +++++++++++-------- 3 files changed, 36 insertions(+), 29 deletions(-) diff --git a/packages/gatsby/package.json b/packages/gatsby/package.json index 91aa175f4303d..71b8c0a02683c 100644 --- a/packages/gatsby/package.json +++ b/packages/gatsby/package.json @@ -96,7 +96,6 @@ "invariant": "^2.2.4", "is-relative": "^1.0.0", "is-relative-url": "^3.0.0", - "jest-worker": "^24.9.0", "joi": "^17.2.1", "json-loader": "^0.5.7", "json-stringify-safe": "^5.0.1", diff --git a/packages/gatsby/src/utils/dev-ssr/render-dev-html-child.js b/packages/gatsby/src/utils/dev-ssr/render-dev-html-child.js index d9d36e65a7c9d..3b95056c02620 100644 --- a/packages/gatsby/src/utils/dev-ssr/render-dev-html-child.js +++ b/packages/gatsby/src/utils/dev-ssr/render-dev-html-child.js @@ -115,5 +115,3 @@ exports.renderHTML = ({ exports.deleteModuleCache = htmlComponentRendererPath => { delete require.cache[require.resolve(htmlComponentRendererPath)] } - -exports.warmup = () => `warmed` diff --git a/packages/gatsby/src/utils/dev-ssr/render-dev-html.ts b/packages/gatsby/src/utils/dev-ssr/render-dev-html.ts index 17e31cfc9ee67..53e182d430cd2 100644 --- a/packages/gatsby/src/utils/dev-ssr/render-dev-html.ts +++ b/packages/gatsby/src/utils/dev-ssr/render-dev-html.ts @@ -1,4 +1,4 @@ -import JestWorker from "jest-worker" +import { WorkerPool } from "gatsby-worker" import fs from "fs-extra" import nodePath from "path" import report from "gatsby-cli/lib/reporter" @@ -11,30 +11,47 @@ import { getDevSSRWebpack } from "../../commands/build-html" import { emitter, GatsbyReduxStore } from "../../redux" import { IGatsbyPage } from "../../redux/types" -const startWorker = (): JestWorker => { - const newWorker = new JestWorker(require.resolve(`./render-dev-html-child`), { - exposedMethods: [`renderHTML`, `deleteModuleCache`, `warmup`], - numWorkers: 1, - forkOptions: { - silent: false, +interface IErrorRenderMeta { + codeFrame: string + source: string + line: number + column: number + sourceMessage?: string + stack?: string +} + +// TODO: convert `render-dev-html-child.js` to TS and use `typeof import("./render-dev-html-child")` +// instead of defining interface here +interface IRenderDevHtmlChild { + renderHTML: (arg: { + path: string + componentPath: string + htmlComponentRendererPath: string + publicDir: string + isClientOnlyPage?: boolean + error?: IErrorRenderMeta + directory?: string + }) => Promise + deleteModuleCache: (htmlComponentRendererPath: string) => void +} + +const startWorker = (): WorkerPool => { + const newWorker = new WorkerPool( + require.resolve(`./render-dev-html-child`), + { + numWorkers: 1, env: { - ...process.env, NODE_ENV: isCI() ? `production` : `development`, forceColors: `true`, GATSBY_EXPERIMENTAL_DEV_SSR: `true`, }, - }, - }) - - // jest-worker is lazy with forking but we want to fork immediately so the user - // doesn't have to wait. - // @ts-ignore - newWorker.warmup() + } + ) return newWorker } -let worker +let worker: WorkerPool export const initDevWorkerPool = (): void => { worker = startWorker() } @@ -53,7 +70,7 @@ export const restartWorker = (htmlComponentRendererPath: string): void => { oldWorker.end() changeCount = 0 } else { - worker.deleteModuleCache(htmlComponentRendererPath) + worker.all.deleteModuleCache(htmlComponentRendererPath) } } @@ -136,14 +153,7 @@ interface IRenderDevHtmlProps { page: IGatsbyPage skipSsr?: boolean store: GatsbyReduxStore - error?: { - codeFrame: string - source: string - line: number - column: number - sourceMessage?: string - stack?: string - } + error?: IErrorRenderMeta htmlComponentRendererPath: string directory: string } @@ -243,7 +253,7 @@ export const renderDevHTML = ({ const publicDir = nodePath.join(directory, `public`) try { - const htmlString = await worker.renderHTML({ + const htmlString = await worker.single.renderHTML({ path, componentPath: pageObj.component, htmlComponentRendererPath, From 6aff32a85b9dfa4e617ff145b953df1a32f6f859 Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Fri, 25 Jun 2021 14:27:20 +0200 Subject: [PATCH 04/13] test(gatsby-cli): one test at a time --- integration-tests/gatsby-cli/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/gatsby-cli/package.json b/integration-tests/gatsby-cli/package.json index 9abe5f310e7de..e73eb89c912ce 100644 --- a/integration-tests/gatsby-cli/package.json +++ b/integration-tests/gatsby-cli/package.json @@ -6,7 +6,7 @@ }, "license": "MIT", "scripts": { - "test": "jest" + "test": "jest -w 1" }, "devDependencies": { "babel-jest": "^24.0.0", From 870e71b6e0b3a52ddbc4a24a8806962f30d95e63 Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Tue, 29 Jun 2021 11:46:44 +0200 Subject: [PATCH 05/13] some code shuffling to avoid some error throwing when importing child (preperation for messaging API) --- packages/gatsby-worker/src/child.ts | 85 ++++++++++++++--------------- packages/gatsby-worker/src/index.ts | 17 ++++-- 2 files changed, 55 insertions(+), 47 deletions(-) diff --git a/packages/gatsby-worker/src/child.ts b/packages/gatsby-worker/src/child.ts index 9c96170c8e1da..06f9f8a3cfe64 100644 --- a/packages/gatsby-worker/src/child.ts +++ b/packages/gatsby-worker/src/child.ts @@ -8,59 +8,58 @@ import { } from "./types" import { isPromise } from "./utils" -let ensuredSendToMain: (msg: ChildMessageUnion) => void +let isWorker = false -if (!process.send) { - throw new Error(`I was told there would be parentPort :shrug:`) -} else { - ensuredSendToMain = process.send.bind(process) -} +if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { + isWorker = true + const ensuredSendToMain = process.send.bind(process) as ( + msg: ChildMessageUnion + ) => void -if (!process.env.GATSBY_WORKER_MODULE_PATH) { - throw new Error(`I was told there would be worker module path :shrug:`) -} + function onError(error: Error): void { + if (error == null) { + error = new Error(`"null" or "undefined" thrown`) + } -const child = require(process.env.GATSBY_WORKER_MODULE_PATH) + const msg: ChildMessageUnion = [ + ERROR, + error.constructor && error.constructor.name, + error.message, + error.stack, + error, + ] -function onError(error: Error): void { - if (error == null) { - error = new Error(`"null" or "undefined" thrown`) + ensuredSendToMain(msg) } - const msg: ChildMessageUnion = [ - ERROR, - error.constructor && error.constructor.name, - error.message, - error.stack, - error, - ] - - ensuredSendToMain(msg) -} + function onResult(result: unknown): void { + const msg: ChildMessageUnion = [RESULT, result] + ensuredSendToMain(msg) + } -function onResult(result: unknown): void { - const msg: ChildMessageUnion = [RESULT, result] - ensuredSendToMain(msg) -} + const child = require(process.env.GATSBY_WORKER_MODULE_PATH) -function messageHandler(msg: ParentMessageUnion): void { - if (msg[0] === EXECUTE) { - let result - try { - result = child[msg[1]].call(child, ...msg[2]) - } catch (e) { - onError(e) - return - } + function messageHandler(msg: ParentMessageUnion): void { + if (msg[0] === EXECUTE) { + let result + try { + result = child[msg[1]].call(child, ...msg[2]) + } catch (e) { + onError(e) + return + } - if (isPromise(result)) { - result.then(onResult, onError) - } else { - onResult(result) + if (isPromise(result)) { + result.then(onResult, onError) + } else { + onResult(result) + } + } else if (msg[0] === END) { + process.off(`message`, messageHandler) } - } else if (msg[0] === END) { - process.off(`message`, messageHandler) } + + process.on(`message`, messageHandler) } -process.on(`message`, messageHandler) +export { isWorker } diff --git a/packages/gatsby-worker/src/index.ts b/packages/gatsby-worker/src/index.ts index b24c95622ec72..184ced29e8d91 100644 --- a/packages/gatsby-worker/src/index.ts +++ b/packages/gatsby-worker/src/index.ts @@ -173,16 +173,23 @@ export class WorkerPool> { } worker.on(`message`, (msg: ChildMessageUnion) => { - if (!workerInfo.currentTask) { - throw new Error(`worker finished work but no idea what work :shrug:`) - } - if (msg[0] === RESULT) { + if (!workerInfo.currentTask) { + throw new Error( + `Invariant: gatsby-worker received execution result, but it wasn't expecting it.` + ) + } const task = workerInfo.currentTask workerInfo.currentTask = undefined this.checkForWork(workerInfo) task.resolve(msg[1]) } else if (msg[0] === ERROR) { + if (!workerInfo.currentTask) { + throw new Error( + `Invariant: gatsby-worker received execution rejection, but it wasn't expecting it.` + ) + } + let error = msg[4] if (error !== null && typeof error === `object`) { @@ -320,3 +327,5 @@ export class WorkerPool> { ) } } + +export * from "./child" From aeca3a615c7f01086d44a655f1f6d423d14930aa Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Tue, 29 Jun 2021 11:48:54 +0200 Subject: [PATCH 06/13] drop commented out console.log ( from review comments ) --- packages/gatsby-worker/src/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/gatsby-worker/src/index.ts b/packages/gatsby-worker/src/index.ts index 184ced29e8d91..e2b106bcc3c53 100644 --- a/packages/gatsby-worker/src/index.ts +++ b/packages/gatsby-worker/src/index.ts @@ -166,7 +166,6 @@ export class WorkerPool> { } // remove worker from list of workers this.workers.splice(this.workers.indexOf(workerInfo), 1) - // console.log(`after exit`, this.workers) resolve({ code, signal }) }) }), From 2bf3facf8a49451750d139ce4ef0de12ef2fbb12 Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Tue, 29 Jun 2021 11:49:50 +0200 Subject: [PATCH 07/13] port debug|inspect cli flag handling from jest-worker --- packages/gatsby-worker/src/index.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/gatsby-worker/src/index.ts b/packages/gatsby-worker/src/index.ts index e2b106bcc3c53..a5eb9beced67f 100644 --- a/packages/gatsby-worker/src/index.ts +++ b/packages/gatsby-worker/src/index.ts @@ -151,6 +151,8 @@ export class WorkerPool> { GATSBY_WORKER_ID: workerId.toString(), GATSBY_WORKER_MODULE_PATH: workerPath, }, + // Suppress --debug / --inspect flags while preserving others (like --harmony). + execArgv: process.execArgv.filter(v => !/^--(debug|inspect)/.test(v)), }) const workerInfo: IWorkerInfo = { From bd3c1f74f6b2ac2461b05283b20a5fb3b04921e7 Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Tue, 29 Jun 2021 12:00:38 +0200 Subject: [PATCH 08/13] drop skipLibCheck from tsconfig --- packages/gatsby-worker/tsconfig.json | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/gatsby-worker/tsconfig.json b/packages/gatsby-worker/tsconfig.json index c565028b9760a..366c3b052bed0 100644 --- a/packages/gatsby-worker/tsconfig.json +++ b/packages/gatsby-worker/tsconfig.json @@ -1,8 +1,5 @@ { "extends": "../../tsconfig.json", "include": ["src"], - "compilerOptions": { - "skipLibCheck": true - }, "exclude": ["**/__tests__/**/*"] } From ca748344cd6af59522b303f48e7e6c37f54b042d Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Tue, 29 Jun 2021 12:22:20 +0200 Subject: [PATCH 09/13] address Vlad's comment --- .../gatsby-worker/src/__tests__/task-queue.ts | 37 +++++++++++++++++++ packages/gatsby-worker/src/task-queue.ts | 3 ++ 2 files changed, 40 insertions(+) diff --git a/packages/gatsby-worker/src/__tests__/task-queue.ts b/packages/gatsby-worker/src/__tests__/task-queue.ts index 66caa5048b1cd..fd8dc3d7e2d8d 100644 --- a/packages/gatsby-worker/src/__tests__/task-queue.ts +++ b/packages/gatsby-worker/src/__tests__/task-queue.ts @@ -198,4 +198,41 @@ describe(`Task Queue`, () => { ] `) }) + + describe(`Removed node is not referenced in .next or .prev`, () => { + let taskQueue: TaskQueue + beforeEach(() => { + taskQueue = new TaskQueue() + + taskQueue.enqueue(1) + taskQueue.enqueue(2) + taskQueue.enqueue(3) + }) + + it.each([ + [`removed from head`, 1], + [`removed from middle`, 2], + [`removed from tail`, 3], + ])(`%s`, (_label, itemToRemove) => { + for (const item of taskQueue) { + if (item.value === itemToRemove) { + taskQueue.remove(item) + } + } + + for (const item of taskQueue) { + if (item.value === itemToRemove) { + fail(`"${itemToRemove}" found as value`) + } + + if (item?.prev?.value === itemToRemove) { + fail(`"${itemToRemove}" found as value of previous node`) + } + + if (item?.next?.value === itemToRemove) { + fail(`"${itemToRemove}" found as value of next node`) + } + } + }) + }) }) diff --git a/packages/gatsby-worker/src/task-queue.ts b/packages/gatsby-worker/src/task-queue.ts index ee390148484c8..42945763e3952 100644 --- a/packages/gatsby-worker/src/task-queue.ts +++ b/packages/gatsby-worker/src/task-queue.ts @@ -41,6 +41,9 @@ export class TaskQueue { } else { if (taskNode === this.tail) { this.tail = taskNode.prev + } else { + // if node is not the tail then it will have .next + taskNode.next!.prev = taskNode.prev } // if node is not the head then it will have .prev taskNode.prev!.next = taskNode.next From 7ab0f18a6dd0785bb79b10eb8305b5255ee40980 Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Tue, 29 Jun 2021 12:50:04 +0200 Subject: [PATCH 10/13] add some jsdocs --- packages/gatsby-worker/src/child.ts | 3 +++ packages/gatsby-worker/src/index.ts | 12 ++++++++++++ packages/gatsby-worker/src/task-queue.ts | 12 ++++++++++++ 3 files changed, 27 insertions(+) diff --git a/packages/gatsby-worker/src/child.ts b/packages/gatsby-worker/src/child.ts index 06f9f8a3cfe64..3619108caa6b9 100644 --- a/packages/gatsby-worker/src/child.ts +++ b/packages/gatsby-worker/src/child.ts @@ -8,6 +8,9 @@ import { } from "./types" import { isPromise } from "./utils" +/** + * Used to check wether current context is executed in worker process + */ let isWorker = false if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { diff --git a/packages/gatsby-worker/src/index.ts b/packages/gatsby-worker/src/index.ts index a5eb9beced67f..43d8d525ea072 100644 --- a/packages/gatsby-worker/src/index.ts +++ b/packages/gatsby-worker/src/index.ts @@ -85,6 +85,14 @@ interface IWorkerInfo { currentTask?: TaskInfo } +/** + * Worker pool is a class that allow you to queue function execution across multiple + * child processes, in order to parallelize work. It accepts absolute path to worker module + * and will expose exported function of that module as properties on WorkerPool instance. + * + * Worker pool allows queueing execution of a function on all workers (via `.all` property) + * as well as distributing execution across workers (via `.single` property) + */ export class WorkerPool> { /** * Schedule task execution on all workers. Useful for setting up workers @@ -223,6 +231,10 @@ export class WorkerPool> { } } + /** + * Kills worker processes and rejects and ongoing or pending tasks. + * @returns Array of promises for each worker that will resolve once worker did exit. + */ end(): Array> { const results = this.workers.map(async workerInfo => { // tell worker to end gracefully diff --git a/packages/gatsby-worker/src/task-queue.ts b/packages/gatsby-worker/src/task-queue.ts index 42945763e3952..6d0c9f10975b0 100644 --- a/packages/gatsby-worker/src/task-queue.ts +++ b/packages/gatsby-worker/src/task-queue.ts @@ -3,6 +3,10 @@ interface ITaskQueueNode { next?: ITaskQueueNode prev?: ITaskQueueNode } + +/** + * Task queue implemented with doubly linked list + */ export class TaskQueue { private head?: ITaskQueueNode private tail?: ITaskQueueNode; @@ -15,6 +19,10 @@ export class TaskQueue { } } + /** + * Puts new task at the end of the list + * @param task Task to add to the queue + */ enqueue(task: ValueType): void { const newNode: ITaskQueueNode = { value: task, @@ -29,6 +37,10 @@ export class TaskQueue { this.tail = newNode } + /** + * Remove a task node from the queue + * @param taskNode Queue's node to remove + */ remove(taskNode: ITaskQueueNode): void { if (taskNode === this.head) { this.head = taskNode.next From 102007a2000684cc7c9dae8e6d9d10ee614d192e Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Tue, 29 Jun 2021 12:53:40 +0200 Subject: [PATCH 11/13] mention why gatsby-worker only expose functions --- packages/gatsby-worker/src/index.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/gatsby-worker/src/index.ts b/packages/gatsby-worker/src/index.ts index 43d8d525ea072..f2bc96b2beef5 100644 --- a/packages/gatsby-worker/src/index.ts +++ b/packages/gatsby-worker/src/index.ts @@ -129,7 +129,10 @@ export class WorkerPool> { for (const exportName of exportNames) { if (typeof module[exportName] !== `function`) { - // we only expose functions + // We only expose functions. Exposing other types + // would require additional handling which doesn't seem + // worth supporting given that consumers can just access + // those via require/import instead of WorkerPool interface. continue } From 48d001a1b5d5ab66b0b0a1bcc3234d5386241c94 Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Tue, 29 Jun 2021 13:00:07 +0200 Subject: [PATCH 12/13] document isWorker export --- packages/gatsby-worker/README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/packages/gatsby-worker/README.md b/packages/gatsby-worker/README.md index 0a00d08823d54..aa8bb57c1b44f 100644 --- a/packages/gatsby-worker/README.md +++ b/packages/gatsby-worker/README.md @@ -86,6 +86,19 @@ const arrayOfPromises = workerPool.all.setupStep(`baz`) const arrayOfPromises = workerPool.end() ``` +### `isWorker` + +```ts +// Determine if current context is executed in worker context. Useful for conditional handling depending on context. +import { isWorker } from "gatsby-worker" + +if (isWorker) { + // this is executed in worker context +} else { + // this is NOT executed in worker context +} +``` + ## Usage with unit tests If you are working with source files that need transpilation, you will need to make it possible to load untranspiled modules in child processes. From 93bcbbfdecc74807bd51bb8841179141438ce158 Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Tue, 29 Jun 2021 13:26:03 +0200 Subject: [PATCH 13/13] add comments about usage of JEST_WORKER_ID which is used by tests, even tho we ourselves will be using gatsby-worker --- packages/gatsby/src/datastore/lmdb/lmdb-datastore.ts | 3 +++ .../utils/worker/__tests__/test-helpers/create-test-worker.ts | 2 ++ 2 files changed, 5 insertions(+) diff --git a/packages/gatsby/src/datastore/lmdb/lmdb-datastore.ts b/packages/gatsby/src/datastore/lmdb/lmdb-datastore.ts index 6fd37fa77db4c..cd54843040fd7 100644 --- a/packages/gatsby/src/datastore/lmdb/lmdb-datastore.ts +++ b/packages/gatsby/src/datastore/lmdb/lmdb-datastore.ts @@ -9,6 +9,9 @@ import { emitter, replaceReducer } from "../../redux" const rootDbFile = process.env.NODE_ENV === `test` ? `test-datastore-${ + // FORCE_TEST_DATABASE_ID will be set if this gets executed in worker context + // when running jest tests. JEST_WORKER_ID will be set when this gets executed directly + // in test context (jest will use jest-worker internally). process.env.FORCE_TEST_DATABASE_ID ?? process.env.JEST_WORKER_ID }` : `datastore` diff --git a/packages/gatsby/src/utils/worker/__tests__/test-helpers/create-test-worker.ts b/packages/gatsby/src/utils/worker/__tests__/test-helpers/create-test-worker.ts index 9da78204bedd2..d0d86ecb7f3f4 100644 --- a/packages/gatsby/src/utils/worker/__tests__/test-helpers/create-test-worker.ts +++ b/packages/gatsby/src/utils/worker/__tests__/test-helpers/create-test-worker.ts @@ -10,6 +10,8 @@ export function createTestWorker(): GatsbyTestWorkerPool { { numWorkers: 1, env: { + // We are using JEST_WORKER_ID env so that worker use same test database as + // jest runner process FORCE_TEST_DATABASE_ID: process.env.JEST_WORKER_ID, GATSBY_WORKER_POOL_WORKER: `true`, NODE_OPTIONS: `--require ${require.resolve(`./ts-register`)}`,