Skip to content

Commit

Permalink
PriorityQueue implementation for jest-worker (#10921)
Browse files Browse the repository at this point in the history
  • Loading branch information
Micha Reiser committed Dec 26, 2020
1 parent e84f682 commit 0ab070c
Show file tree
Hide file tree
Showing 11 changed files with 583 additions and 65 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -18,6 +18,7 @@
- `[jest-runner]` [**BREAKING**] Run transforms over `testRunnner` ([#8823](https://github.com/facebook/jest/pull/8823))
- `[jest-runtime, jest-transform]` share `cacheFS` between runtime and transformer ([#10901](https://github.com/facebook/jest/pull/10901))
- `[jest-transform]` Pass config options defined in Jest's config to transformer's `process` and `getCacheKey` functions ([#10926](https://github.com/facebook/jest/pull/10926))
- `[jest-worker]` Add support for custom task queues and adds a `PriorityQueue` implementation. ([#10921](https://github.com/facebook/jest/pull/10921))

### Fixes

Expand Down
7 changes: 7 additions & 0 deletions packages/jest-worker/README.md
Expand Up @@ -91,6 +91,13 @@ Provide a custom worker pool to be used for spawning child processes. By default

`jest-worker` will automatically detect if `worker_threads` are available, but will not use them unless passed `enableWorkerThreads: true`.

### `taskQueue`: TaskQueue` (optional)

The task queue defines in which order tasks (method calls) are processed by the workers. `jest-worker` ships with a `FifoQueue` and `PriorityQueue`:

- `FifoQueue` (default): Processes the method calls (tasks) in the call order.
- `PriorityQueue`: Processes the method calls by a computed priority in natural ordering (lower priorities first). Tasks with the same priority are processed in any order (FIFO not guaranteed). The constructor accepts a single argument, the function that is passed the name of the called function and the arguments and returns a numerical value for the priority: `new require('jest-worker').PriorityQueue((method, filename) => filename.length)`.

## JestWorker

### Methods
Expand Down
67 changes: 23 additions & 44 deletions packages/jest-worker/src/Farm.ts
Expand Up @@ -7,6 +7,7 @@

/* eslint-disable local/ban-types-eventually */

import FifoQueue from './FifoQueue';
import {
CHILD_MESSAGE_CALL,
ChildMessage,
Expand All @@ -16,36 +17,34 @@ import {
OnStart,
PromiseWithCustomMessage,
QueueChildMessage,
QueueItem,
TaskQueue,
WorkerInterface,
} from './types';

export default class Farm {
private _computeWorkerKey: FarmOptions['computeWorkerKey'];
private _cacheKeys: Record<string, WorkerInterface>;
private _callback: Function;
private _last: Array<QueueItem>;
private _locks: Array<boolean>;
private _numOfWorkers: number;
private _offset: number;
private _queue: Array<QueueItem | null>;
private _taskQueue: TaskQueue;

constructor(
numOfWorkers: number,
callback: Function,
computeWorkerKey?: FarmOptions['computeWorkerKey'],
options: {
computeWorkerKey?: FarmOptions['computeWorkerKey'];
taskQueue?: TaskQueue;
} = {},
) {
this._cacheKeys = Object.create(null);
this._callback = callback;
this._last = [];
this._locks = [];
this._numOfWorkers = numOfWorkers;
this._offset = 0;
this._queue = [];

if (computeWorkerKey) {
this._computeWorkerKey = computeWorkerKey;
}
this._computeWorkerKey = options.computeWorkerKey;
this._taskQueue = options.taskQueue ?? new FifoQueue();
}

doWork(
Expand Down Expand Up @@ -96,7 +95,8 @@ export default class Farm {
const task = {onCustomMessage, onEnd, onStart, request};

if (worker) {
this._enqueue(task, worker.getWorkerId());
this._taskQueue.enqueue(task, worker.getWorkerId());
this._process(worker.getWorkerId());
} else {
this._push(task);
}
Expand All @@ -108,29 +108,21 @@ export default class Farm {
return promise;
}

private _getNextTask(workerId: number): QueueChildMessage | null {
let queueHead = this._queue[workerId];

while (queueHead && queueHead.task.request[1]) {
queueHead = queueHead.next || null;
}

this._queue[workerId] = queueHead;

return queueHead && queueHead.task;
}

private _process(workerId: number): Farm {
if (this._isLocked(workerId)) {
return this;
}

const task = this._getNextTask(workerId);
const task = this._taskQueue.dequeue(workerId);

if (!task) {
return this;
}

if (task.request[1]) {
throw new Error('Queue implementation returned processed task');
}

const onEnd = (error: Error | null, result: unknown) => {
task.onEnd(error, result);

Expand All @@ -152,28 +144,15 @@ export default class Farm {
return this;
}

private _enqueue(task: QueueChildMessage, workerId: number): Farm {
const item = {next: null, task};

if (task.request[1]) {
return this;
}

if (this._queue[workerId]) {
this._last[workerId].next = item;
} else {
this._queue[workerId] = item;
}

this._last[workerId] = item;
this._process(workerId);

return this;
}

private _push(task: QueueChildMessage): Farm {
this._taskQueue.enqueue(task);

for (let i = 0; i < this._numOfWorkers; i++) {
this._enqueue(task, (this._offset + i) % this._numOfWorkers);
this._process((this._offset + i) % this._numOfWorkers);

if (task.request[1]) {
break;
}
}

this._offset++;
Expand Down
115 changes: 115 additions & 0 deletions packages/jest-worker/src/FifoQueue.ts
@@ -0,0 +1,115 @@
/**
* Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

import type {QueueChildMessage, TaskQueue} from './types';

type WorkerQueueValue = {
task: QueueChildMessage;

/**
* The task that was at the top of the shared queue at the time this
* worker specific task was enqueued. Required to maintain FIFO ordering
* across queues. The worker specific task should only be dequeued if the
* previous shared task is null or has been processed.
*/
previousSharedTask: QueueChildMessage | null;
};

/**
* First-in, First-out task queue that manages a dedicated pool
* for each worker as well as a shared queue. The FIFO ordering is guaranteed
* across the worker specific and shared queue.
*/
export default class FifoQueue implements TaskQueue {
private _workerQueues: Array<
InternalQueue<WorkerQueueValue> | undefined
> = [];
private _sharedQueue = new InternalQueue<QueueChildMessage>();

enqueue(task: QueueChildMessage, workerId?: number): void {
if (workerId == null) {
this._sharedQueue.enqueue(task);
return;
}

let workerQueue = this._workerQueues[workerId];
if (workerQueue == null) {
workerQueue = this._workerQueues[
workerId
] = new InternalQueue<WorkerQueueValue>();
}

const sharedTop = this._sharedQueue.peekLast();
const item = {previousSharedTask: sharedTop, task};

workerQueue.enqueue(item);
}

dequeue(workerId: number): QueueChildMessage | null {
const workerTop = this._workerQueues[workerId]?.peek();
const sharedTaskIsProcessed =
workerTop?.previousSharedTask?.request[1] ?? true;

// Process the top task from the shared queue if
// - there's no task in the worker specific queue or
// - if the non-worker-specific task after which this worker specifif task
// hasn been queued wasn't processed yet
if (workerTop != null && sharedTaskIsProcessed) {
return this._workerQueues[workerId]?.dequeue()?.task ?? null;
}

return this._sharedQueue.dequeue();
}
}

type QueueItem<TValue> = {
value: TValue;
next: QueueItem<TValue> | null;
};

/**
* FIFO queue for a single worker / shared queue.
*/
class InternalQueue<TValue> {
private _head: QueueItem<TValue> | null = null;
private _last: QueueItem<TValue> | null = null;

enqueue(value: TValue): void {
const item = {next: null, value};

if (this._last == null) {
this._head = item;
} else {
this._last.next = item;
}

this._last = item;
}

dequeue(): TValue | null {
if (this._head == null) {
return null;
}

const item = this._head;
this._head = item.next;

if (this._head == null) {
this._last = null;
}

return item.value;
}

peek(): TValue | null {
return this._head?.value ?? null;
}

peekLast(): TValue | null {
return this._last?.value ?? null;
}
}

0 comments on commit 0ab070c

Please sign in to comment.