From 0ab070c48946057209d65b00f5a7f6de5c5b7a8f Mon Sep 17 00:00:00 2001 From: Micha Reiser Date: Sat, 26 Dec 2020 13:32:04 +0000 Subject: [PATCH] PriorityQueue implementation for jest-worker (#10921) --- CHANGELOG.md | 1 + packages/jest-worker/README.md | 7 + packages/jest-worker/src/Farm.ts | 67 +++---- packages/jest-worker/src/FifoQueue.ts | 115 ++++++++++++ packages/jest-worker/src/PriorityQueue.ts | 170 ++++++++++++++++++ .../src/__performance_tests__/test.js | 17 +- .../jest-worker/src/__tests__/Farm.test.js | 14 +- .../src/__tests__/FifoQueue.test.js | 94 ++++++++++ .../src/__tests__/PriorityQueue.test.js | 128 +++++++++++++ packages/jest-worker/src/index.ts | 10 +- packages/jest-worker/src/types.ts | 25 ++- 11 files changed, 583 insertions(+), 65 deletions(-) create mode 100644 packages/jest-worker/src/FifoQueue.ts create mode 100644 packages/jest-worker/src/PriorityQueue.ts create mode 100644 packages/jest-worker/src/__tests__/FifoQueue.test.js create mode 100644 packages/jest-worker/src/__tests__/PriorityQueue.test.js diff --git a/CHANGELOG.md b/CHANGELOG.md index 73f7428cb4c8..e78bd56e2053 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - `[jest-runner]` [**BREAKING**] Run transforms over `testRunnner` ([#8823](https://github.com/facebook/jest/pull/8823)) - `[jest-runtime, jest-transform]` share `cacheFS` between runtime and transformer ([#10901](https://github.com/facebook/jest/pull/10901)) - `[jest-transform]` Pass config options defined in Jest's config to transformer's `process` and `getCacheKey` functions ([#10926](https://github.com/facebook/jest/pull/10926)) +- `[jest-worker]` Add support for custom task queues and adds a `PriorityQueue` implementation. ([#10921](https://github.com/facebook/jest/pull/10921)) ### Fixes diff --git a/packages/jest-worker/README.md b/packages/jest-worker/README.md index 38b7dd60b89b..085a8c4797df 100644 --- a/packages/jest-worker/README.md +++ b/packages/jest-worker/README.md @@ -91,6 +91,13 @@ Provide a custom worker pool to be used for spawning child processes. By default `jest-worker` will automatically detect if `worker_threads` are available, but will not use them unless passed `enableWorkerThreads: true`. +### `taskQueue`: TaskQueue` (optional) + +The task queue defines in which order tasks (method calls) are processed by the workers. `jest-worker` ships with a `FifoQueue` and `PriorityQueue`: + +- `FifoQueue` (default): Processes the method calls (tasks) in the call order. +- `PriorityQueue`: Processes the method calls by a computed priority in natural ordering (lower priorities first). Tasks with the same priority are processed in any order (FIFO not guaranteed). The constructor accepts a single argument, the function that is passed the name of the called function and the arguments and returns a numerical value for the priority: `new require('jest-worker').PriorityQueue((method, filename) => filename.length)`. + ## JestWorker ### Methods diff --git a/packages/jest-worker/src/Farm.ts b/packages/jest-worker/src/Farm.ts index 2c3950c7a30f..302b1cba2f6c 100644 --- a/packages/jest-worker/src/Farm.ts +++ b/packages/jest-worker/src/Farm.ts @@ -7,6 +7,7 @@ /* eslint-disable local/ban-types-eventually */ +import FifoQueue from './FifoQueue'; import { CHILD_MESSAGE_CALL, ChildMessage, @@ -16,7 +17,7 @@ import { OnStart, PromiseWithCustomMessage, QueueChildMessage, - QueueItem, + TaskQueue, WorkerInterface, } from './types'; @@ -24,28 +25,26 @@ export default class Farm { private _computeWorkerKey: FarmOptions['computeWorkerKey']; private _cacheKeys: Record; private _callback: Function; - private _last: Array; private _locks: Array; private _numOfWorkers: number; private _offset: number; - private _queue: Array; + private _taskQueue: TaskQueue; constructor( numOfWorkers: number, callback: Function, - computeWorkerKey?: FarmOptions['computeWorkerKey'], + options: { + computeWorkerKey?: FarmOptions['computeWorkerKey']; + taskQueue?: TaskQueue; + } = {}, ) { this._cacheKeys = Object.create(null); this._callback = callback; - this._last = []; this._locks = []; this._numOfWorkers = numOfWorkers; this._offset = 0; - this._queue = []; - - if (computeWorkerKey) { - this._computeWorkerKey = computeWorkerKey; - } + this._computeWorkerKey = options.computeWorkerKey; + this._taskQueue = options.taskQueue ?? new FifoQueue(); } doWork( @@ -96,7 +95,8 @@ export default class Farm { const task = {onCustomMessage, onEnd, onStart, request}; if (worker) { - this._enqueue(task, worker.getWorkerId()); + this._taskQueue.enqueue(task, worker.getWorkerId()); + this._process(worker.getWorkerId()); } else { this._push(task); } @@ -108,29 +108,21 @@ export default class Farm { return promise; } - private _getNextTask(workerId: number): QueueChildMessage | null { - let queueHead = this._queue[workerId]; - - while (queueHead && queueHead.task.request[1]) { - queueHead = queueHead.next || null; - } - - this._queue[workerId] = queueHead; - - return queueHead && queueHead.task; - } - private _process(workerId: number): Farm { if (this._isLocked(workerId)) { return this; } - const task = this._getNextTask(workerId); + const task = this._taskQueue.dequeue(workerId); if (!task) { return this; } + if (task.request[1]) { + throw new Error('Queue implementation returned processed task'); + } + const onEnd = (error: Error | null, result: unknown) => { task.onEnd(error, result); @@ -152,28 +144,15 @@ export default class Farm { return this; } - private _enqueue(task: QueueChildMessage, workerId: number): Farm { - const item = {next: null, task}; - - if (task.request[1]) { - return this; - } - - if (this._queue[workerId]) { - this._last[workerId].next = item; - } else { - this._queue[workerId] = item; - } - - this._last[workerId] = item; - this._process(workerId); - - return this; - } - private _push(task: QueueChildMessage): Farm { + this._taskQueue.enqueue(task); + for (let i = 0; i < this._numOfWorkers; i++) { - this._enqueue(task, (this._offset + i) % this._numOfWorkers); + this._process((this._offset + i) % this._numOfWorkers); + + if (task.request[1]) { + break; + } } this._offset++; diff --git a/packages/jest-worker/src/FifoQueue.ts b/packages/jest-worker/src/FifoQueue.ts new file mode 100644 index 000000000000..ff9ec1edce73 --- /dev/null +++ b/packages/jest-worker/src/FifoQueue.ts @@ -0,0 +1,115 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import type {QueueChildMessage, TaskQueue} from './types'; + +type WorkerQueueValue = { + task: QueueChildMessage; + + /** + * The task that was at the top of the shared queue at the time this + * worker specific task was enqueued. Required to maintain FIFO ordering + * across queues. The worker specific task should only be dequeued if the + * previous shared task is null or has been processed. + */ + previousSharedTask: QueueChildMessage | null; +}; + +/** + * First-in, First-out task queue that manages a dedicated pool + * for each worker as well as a shared queue. The FIFO ordering is guaranteed + * across the worker specific and shared queue. + */ +export default class FifoQueue implements TaskQueue { + private _workerQueues: Array< + InternalQueue | undefined + > = []; + private _sharedQueue = new InternalQueue(); + + enqueue(task: QueueChildMessage, workerId?: number): void { + if (workerId == null) { + this._sharedQueue.enqueue(task); + return; + } + + let workerQueue = this._workerQueues[workerId]; + if (workerQueue == null) { + workerQueue = this._workerQueues[ + workerId + ] = new InternalQueue(); + } + + const sharedTop = this._sharedQueue.peekLast(); + const item = {previousSharedTask: sharedTop, task}; + + workerQueue.enqueue(item); + } + + dequeue(workerId: number): QueueChildMessage | null { + const workerTop = this._workerQueues[workerId]?.peek(); + const sharedTaskIsProcessed = + workerTop?.previousSharedTask?.request[1] ?? true; + + // Process the top task from the shared queue if + // - there's no task in the worker specific queue or + // - if the non-worker-specific task after which this worker specifif task + // hasn been queued wasn't processed yet + if (workerTop != null && sharedTaskIsProcessed) { + return this._workerQueues[workerId]?.dequeue()?.task ?? null; + } + + return this._sharedQueue.dequeue(); + } +} + +type QueueItem = { + value: TValue; + next: QueueItem | null; +}; + +/** + * FIFO queue for a single worker / shared queue. + */ +class InternalQueue { + private _head: QueueItem | null = null; + private _last: QueueItem | null = null; + + enqueue(value: TValue): void { + const item = {next: null, value}; + + if (this._last == null) { + this._head = item; + } else { + this._last.next = item; + } + + this._last = item; + } + + dequeue(): TValue | null { + if (this._head == null) { + return null; + } + + const item = this._head; + this._head = item.next; + + if (this._head == null) { + this._last = null; + } + + return item.value; + } + + peek(): TValue | null { + return this._head?.value ?? null; + } + + peekLast(): TValue | null { + return this._last?.value ?? null; + } +} diff --git a/packages/jest-worker/src/PriorityQueue.ts b/packages/jest-worker/src/PriorityQueue.ts new file mode 100644 index 000000000000..c00097cc97f2 --- /dev/null +++ b/packages/jest-worker/src/PriorityQueue.ts @@ -0,0 +1,170 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import type {QueueChildMessage, TaskQueue} from './types'; + +export type ComputeTaskPriorityCallback = ( + method: string, + ...args: Array +) => number; + +type QueueItem = { + task: QueueChildMessage; + priority: number; +}; + +/** + * Priority queue that processes tasks in natural ordering (lower priority first) + * accoridng to the priority computed by the function passed in the constructor. + * + * FIFO ordering isn't guaranteed for tasks with the same priority. + * + * Worker specific tasks with the same priority as a non-worker specific task + * are always processed first. + */ +export default class PriorityQueue implements TaskQueue { + private _queue: Array> = []; + private _sharedQueue = new MinHeap(); + + constructor(private _computePriority: ComputeTaskPriorityCallback) {} + + enqueue(task: QueueChildMessage, workerId?: number): void { + if (workerId == null) { + this._enqueue(task, this._sharedQueue); + } else { + const queue = this._getWorkerQueue(workerId); + this._enqueue(task, queue); + } + } + + _enqueue(task: QueueChildMessage, queue: MinHeap): void { + const item = { + priority: this._computePriority(task.request[2], ...task.request[3]), + task, + }; + + queue.add(item); + } + + dequeue(workerId: number): QueueChildMessage | null { + const workerQueue = this._getWorkerQueue(workerId); + + const workerTop = workerQueue.peek(); + const sharedTop = this._sharedQueue.peek(); + + // use the task from the worker queue if there's no task in the shared queue + // or if the priority of the worker queue is smaller or equal to the + // priority of the top task in the shared queue. The tasks of the + // worker specific queue are preferred because no other worker can pick this + // specific task up. + if ( + sharedTop == null || + (workerTop != null && workerTop.priority <= sharedTop.priority) + ) { + return workerQueue.poll()?.task ?? null; + } + + return this._sharedQueue.poll()!.task; + } + + _getWorkerQueue(workerId: number): MinHeap { + let queue = this._queue[workerId]; + if (queue == null) { + queue = this._queue[workerId] = new MinHeap(); + } + + return queue; + } +} + +type HeapItem = { + priority: number; +}; + +class MinHeap { + private _heap: Array = []; + + peek(): TItem | null { + return this._heap[0] ?? null; + } + + add(item: TItem): void { + const nodes = this._heap; + nodes.push(item); + + if (nodes.length === 1) { + return; + } + + let currentIndex = nodes.length - 1; + + // Bubble up the added node as long as the parent is bigger + while (currentIndex > 0) { + const parentIndex = Math.floor((currentIndex + 1) / 2) - 1; + const parent = nodes[parentIndex]!; + + if (parent.priority <= item.priority) { + break; + } + + nodes[currentIndex] = parent; + nodes[parentIndex] = item; + + currentIndex = parentIndex; + } + } + + poll(): TItem | null { + const nodes = this._heap; + const result = nodes[0]; + + const lastElement = nodes.pop(); + + // heap was empty or removed the last element + if (result == null || nodes.length === 0) { + return result ?? null; + } + + let index = 0; + nodes[0] = lastElement ?? null; + const element = nodes[0]!; + + while (true) { + let swapIndex = null; + const rightChildIndex = (index + 1) * 2; + const leftChildIndex = rightChildIndex - 1; + const rightChild = nodes[rightChildIndex]; + const leftChild = nodes[leftChildIndex]; + + // if the left child is smaller, swap with the left + if (leftChild != null && leftChild.priority < element.priority) { + swapIndex = leftChildIndex; + } + + // If the right child is smaller or the right child is smaller than the left + // then swap with the right child + if ( + rightChild != null && + rightChild.priority < + (swapIndex == null ? element : leftChild!).priority + ) { + swapIndex = rightChildIndex; + } + + if (swapIndex == null) { + break; + } + + nodes[index] = nodes[swapIndex]; + nodes[swapIndex] = element; + + index = swapIndex; + } + + return result; + } +} diff --git a/packages/jest-worker/src/__performance_tests__/test.js b/packages/jest-worker/src/__performance_tests__/test.js index 53dbc688b76f..10e2571426fe 100644 --- a/packages/jest-worker/src/__performance_tests__/test.js +++ b/packages/jest-worker/src/__performance_tests__/test.js @@ -8,6 +8,7 @@ 'use strict'; const assert = require('assert'); +const {performance} = require('perf_hooks'); // eslint-disable-next-line import/no-extraneous-dependencies const workerFarm = require('worker-farm'); const JestWorker = require('../../build').Worker; @@ -24,13 +25,13 @@ const threads = 6; function testWorkerFarm() { return new Promise(async resolve => { - const startTime = Date.now(); + const startTime = performance.now(); let count = 0; async function countToFinish() { if (++count === calls) { workerFarm.end(api); - const endTime = Date.now(); + const endTime = performance.now(); // Let all workers go down. await sleep(2000); @@ -55,7 +56,7 @@ function testWorkerFarm() { // Let all workers come up. await sleep(2000); - const startProcess = Date.now(); + const startProcess = performance.now(); for (let i = 0; i < calls; i++) { const promisified = new Promise((resolve, reject) => { @@ -75,13 +76,13 @@ function testWorkerFarm() { function testJestWorker() { return new Promise(async resolve => { - const startTime = Date.now(); + const startTime = performance.now(); let count = 0; async function countToFinish() { if (++count === calls) { farm.end(); - const endTime = Date.now(); + const endTime = performance.now(); // Let all workers go down. await sleep(2000); @@ -96,7 +97,7 @@ function testJestWorker() { const farm = new JestWorker(require.resolve('./workers/jest_worker'), { exposedMethods: [method], forkOptions: {execArgv: []}, - workers: threads, + numWorkers: threads, }); farm.getStdout().pipe(process.stdout); @@ -105,7 +106,7 @@ function testJestWorker() { // Let all workers come up. await sleep(2000); - const startProcess = Date.now(); + const startProcess = performance.now(); for (let i = 0; i < calls; i++) { const promisified = farm[method](); @@ -125,7 +126,7 @@ function profileEnd(x) { async function main() { if (!global.gc) { - console.log('GC not present'); + console.warn('GC not present, start with node --expose-gc'); } const wFResults = []; diff --git a/packages/jest-worker/src/__tests__/Farm.test.js b/packages/jest-worker/src/__tests__/Farm.test.js index fcdaef7c54cd..b8aacfe0242b 100644 --- a/packages/jest-worker/src/__tests__/Farm.test.js +++ b/packages/jest-worker/src/__tests__/Farm.test.js @@ -105,7 +105,7 @@ describe('Farm', () => { it('handles null computeWorkerKey, sending to first worker', async () => { const computeWorkerKey = jest.fn(() => null); - const farm = new Farm(4, callback, computeWorkerKey); + const farm = new Farm(4, callback, {computeWorkerKey}); const p0 = farm.doWork('foo', 42); workerReply(0); @@ -132,7 +132,7 @@ describe('Farm', () => { .mockReturnValueOnce('two') .mockReturnValueOnce('one'); - const farm = new Farm(4, callback, computeWorkerKey); + const farm = new Farm(4, callback, {computeWorkerKey}); const p0 = farm.doWork('foo', 42); workerReply(0); @@ -208,7 +208,9 @@ describe('Farm', () => { }); it('checks that once a sticked task finishes, next time is sent to that worker', async () => { - const farm = new Farm(4, callback, () => '1234567890abcdef'); + const farm = new Farm(4, callback, { + computeWorkerKey: () => '1234567890abcdef', + }); // Worker 1 successfully replies with "17" as a result. const p0 = farm.doWork('car', 'plane'); @@ -248,7 +250,9 @@ describe('Farm', () => { }); it('checks that even before a sticked task finishes, next time is sent to that worker', async () => { - const farm = new Farm(4, callback, () => '1234567890abcdef'); + const farm = new Farm(4, callback, { + computeWorkerKey: () => '1234567890abcdef', + }); // Note that the worker is sending a start response synchronously. const p0 = farm.doWork('car', 'plane'); @@ -310,7 +314,7 @@ describe('Farm', () => { // Push onto the second queue; potentially wiping the earlier job. .mockReturnValueOnce(1); - const farm = new Farm(2, callback, hash); + const farm = new Farm(2, callback, {computeWorkerKey: hash}); // First and second jobs get resolved, so that their hash is sticked to // the right worker: worker assignment is performed when workers reply, not diff --git a/packages/jest-worker/src/__tests__/FifoQueue.test.js b/packages/jest-worker/src/__tests__/FifoQueue.test.js new file mode 100644 index 000000000000..b23d732c4590 --- /dev/null +++ b/packages/jest-worker/src/__tests__/FifoQueue.test.js @@ -0,0 +1,94 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +import FifoQueue from '../FifoQueue'; +import {CHILD_MESSAGE_CALL} from '../types'; + +it('returns the shared tasks in FIFO ordering', () => { + const queue = new FifoQueue(); + + const task1 = createQueueChildMessage(); + const task2 = createQueueChildMessage(); + const task3 = createQueueChildMessage(); + + queue.enqueue(task1); + queue.enqueue(task2); + queue.enqueue(task3); + + expect(queue.dequeue(1)).toBe(task1); + expect(queue.dequeue(2)).toBe(task2); + expect(queue.dequeue(3)).toBe(task3); + + expect(queue.dequeue(1)).toBeNull(); + expect(queue.dequeue(2)).toBeNull(); + expect(queue.dequeue(3)).toBeNull(); +}); + +it('returns the worker specific tasks in FIFO ordering', () => { + const queue = new FifoQueue(); + + const task1 = createQueueChildMessage(); + const task2 = createQueueChildMessage(); + const task3 = createQueueChildMessage(); + + queue.enqueue(task1, 1); + queue.enqueue(task2, 1); + queue.enqueue(task3, 1); + + expect(queue.dequeue(1)).toBe(task1); + expect(queue.dequeue(1)).toBe(task2); + expect(queue.dequeue(1)).toBe(task3); + + expect(queue.dequeue(1)).toBeNull(); +}); + +it('maintains global FIFO ordering between worker specific and shared tasks', () => { + const queue = new FifoQueue(); + + const sharedTask1 = createQueueChildMessage({name: 'sharedTask1'}); + const sharedTask2 = createQueueChildMessage({name: 'sharedTask2'}); + const sharedTask3 = createQueueChildMessage({name: 'sharedTask3'}); + const worker1Task1 = createQueueChildMessage({name: 'worker1Task1'}); + const worker1Task2 = createQueueChildMessage({name: 'worker1Task2'}); + const worker2Task2 = createQueueChildMessage({name: 'worker2Task1'}); + + queue.enqueue(worker1Task1, 1); + queue.enqueue(sharedTask1); + queue.enqueue(sharedTask2); + queue.enqueue(worker1Task2, 1); + queue.enqueue(worker2Task2, 2); + queue.enqueue(sharedTask3); + + expect(queue.dequeue(1)).toBe(worker1Task1); + expect(queue.dequeue(2)).toBe(sharedTask1); + sharedTask1.request[1] = true; + + expect(queue.dequeue(1)).toBe(sharedTask2); + sharedTask2.request[1] = true; + + expect(queue.dequeue(1)).toBe(worker1Task2); + expect(queue.dequeue(1)).toBe(sharedTask3); + sharedTask3.request[1] = true; + + expect(queue.dequeue(2)).toBe(worker2Task2); + + expect(queue.dequeue(1)).toBeNull(); + expect(queue.dequeue(2)).toBeNull(); +}); + +function createQueueChildMessage(...args) { + const request = [CHILD_MESSAGE_CALL, false, 'test', args]; + + return { + onCustomMessage: () => {}, + onEnd: () => {}, + onStart: () => {}, + request, + }; +} diff --git a/packages/jest-worker/src/__tests__/PriorityQueue.test.js b/packages/jest-worker/src/__tests__/PriorityQueue.test.js new file mode 100644 index 000000000000..fb40400c82eb --- /dev/null +++ b/packages/jest-worker/src/__tests__/PriorityQueue.test.js @@ -0,0 +1,128 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +import PriorityQueue from '../PriorityQueue'; +import {CHILD_MESSAGE_CALL} from '../types'; + +it('returns the tasks in order', () => { + const computePriority = (_method, task) => task.priority; + const queue = new PriorityQueue(computePriority); + const priorities = [10, 3, 4, 8, 2, 9, 7, 1, 2, 6, 5]; + + for (const priority of priorities) { + queue.enqueue(createQueueChildMessage({priority})); + } + + priorities.sort((a, b) => a - b); + for (const priority of priorities) { + expect(queue.dequeue(0)).toEqual( + expect.objectContaining({ + request: [CHILD_MESSAGE_CALL, false, 'test', [{priority}]], + }), + ); + } + + expect(queue.dequeue(0)).toBeNull(); +}); + +it('returns the task with the lowest priority value if inserted in reversed order', () => { + const last = createQueueChildMessage({priority: 3}); + const mid = createQueueChildMessage({priority: 2}); + const first = createQueueChildMessage({priority: 1}); + + const computePriority = (_method, task) => task.priority; + const queue = new PriorityQueue(computePriority); + + queue.enqueue(last, 1); + queue.enqueue(first, 1); + queue.enqueue(mid, 1); + + expect(queue.dequeue(1)).toBe(first); + expect(queue.dequeue(1)).toBe(mid); + expect(queue.dequeue(1)).toBe(last); + expect(queue.dequeue(1)).toBeNull(); +}); + +it('returns the task with the lowest priority value if inserted in correct order', () => { + const first = createQueueChildMessage({priority: 1}); + const mid = createQueueChildMessage({priority: 2}); + const last = createQueueChildMessage({priority: 3}); + + const computePriority = (_method, task) => task.priority; + const queue = new PriorityQueue(computePriority); + + queue.enqueue(last, 1); + queue.enqueue(first, 1); + queue.enqueue(mid, 1); + + expect(queue.dequeue(1)).toBe(first); + expect(queue.dequeue(1)).toBe(mid); + expect(queue.dequeue(1)).toBe(last); + expect(queue.dequeue(1)).toBeNull(); +}); + +it('uses different queues for each worker', () => { + const task1Worker1 = createQueueChildMessage({priority: 1}); + const task2Worker1 = createQueueChildMessage({priority: 3}); + const task1Worker2 = createQueueChildMessage({priority: 1}); + const task2Worker2 = createQueueChildMessage({priority: 3}); + + const computePriority = (_method, task) => task.priority; + const queue = new PriorityQueue(computePriority); + + queue.enqueue(task2Worker1, 1); + queue.enqueue(task1Worker1, 1); + queue.enqueue(task2Worker2, 2); + queue.enqueue(task1Worker2, 2); + + expect(queue.dequeue(1)).toBe(task1Worker1); + expect(queue.dequeue(1)).toBe(task2Worker1); + expect(queue.dequeue(2)).toBe(task1Worker2); + expect(queue.dequeue(2)).toBe(task2Worker2); + expect(queue.dequeue(1)).toBeNull(); +}); + +it('process task in the global and shared queue in order', () => { + const computePriority = (_method, task) => task.priority; + const queue = new PriorityQueue(computePriority); + + const sharedTask1 = createQueueChildMessage({priority: 1}); + const sharedTask2 = createQueueChildMessage({priority: 3}); + queue.enqueue(sharedTask1); + queue.enqueue(sharedTask2); + + const worker1Task1 = createQueueChildMessage({priority: 0}); + const worker1Task2 = createQueueChildMessage({priority: 2}); + queue.enqueue(worker1Task1, 1); + queue.enqueue(worker1Task2, 1); + + const worker2Task1 = createQueueChildMessage({priority: 3}); + queue.enqueue(worker2Task1, 2); + + expect(queue.dequeue(1)).toBe(worker1Task1); + expect(queue.dequeue(1)).toBe(sharedTask1); + expect(queue.dequeue(1)).toBe(worker1Task2); + + expect(queue.dequeue(2)).toBe(worker2Task1); + expect(queue.dequeue(2)).toBe(sharedTask2); + + expect(queue.dequeue(1)).toBeNull(); + expect(queue.dequeue(2)).toBeNull(); +}); + +function createQueueChildMessage(...args) { + const request = [CHILD_MESSAGE_CALL, false, 'test', args]; + + return { + onCustomMessage: () => {}, + onEnd: () => {}, + onStart: () => {}, + request, + }; +} diff --git a/packages/jest-worker/src/index.ts b/packages/jest-worker/src/index.ts index 19ee73b156cc..fc243894fd66 100644 --- a/packages/jest-worker/src/index.ts +++ b/packages/jest-worker/src/index.ts @@ -14,9 +14,12 @@ import type { FarmOptions, PoolExitResult, PromiseWithCustomMessage, + TaskQueue, WorkerPoolInterface, WorkerPoolOptions, } from './types'; +export {default as PriorityQueue} from './PriorityQueue'; +export {default as FifoQueue} from './FifoQueue'; export {default as messageParent} from './workers/messageParent'; function getExposedMethods( @@ -99,7 +102,10 @@ export class Worker { this._farm = new Farm( workerPoolOptions.numWorkers, this._workerPool.send.bind(this._workerPool), - this._options.computeWorkerKey, + { + computeWorkerKey: this._options.computeWorkerKey, + taskQueue: this._options.taskQueue, + }, ); this._bindExposedWorkerMethods(workerPath, this._options); @@ -152,4 +158,4 @@ export class Worker { } } -export type {PromiseWithCustomMessage}; +export type {PromiseWithCustomMessage, TaskQueue}; diff --git a/packages/jest-worker/src/types.ts b/packages/jest-worker/src/types.ts index 9afa7f92f3c5..2859ead71f5c 100644 --- a/packages/jest-worker/src/types.ts +++ b/packages/jest-worker/src/types.ts @@ -75,6 +75,23 @@ export interface PromiseWithCustomMessage extends Promise { export type {ForkOptions}; +export interface TaskQueue { + /** + * Enqueues the task in the queue for the specified worker or adds it to the + * queue shared by all workers + * @param task the task to queue + * @param workerId the id of the worker that should process this task or undefined + * if there's no preference. + */ + enqueue(task: QueueChildMessage, workerId?: number): void; + + /** + * Dequeues the next item from the queue for the speified worker + * @param workerId the id of the worker for which the next task should be retrieved + */ + dequeue(workerId: number): QueueChildMessage | null; +} + export type FarmOptions = { computeWorkerKey?: (method: string, ...args: Array) => string | null; exposedMethods?: ReadonlyArray; @@ -83,6 +100,7 @@ export type FarmOptions = { setupArgs?: Array; maxRetries?: number; numWorkers?: number; + taskQueue?: TaskQueue; WorkerPool?: ( workerPath: string, options?: WorkerPoolOptions, @@ -176,13 +194,8 @@ export type OnEnd = (err: Error | null, result: unknown) => void; export type OnCustomMessage = (message: Array | unknown) => void; export type QueueChildMessage = { - request: ChildMessage; + request: ChildMessageCall; onStart: OnStart; onEnd: OnEnd; onCustomMessage: OnCustomMessage; }; - -export type QueueItem = { - task: QueueChildMessage; - next: QueueItem | null; -};