Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PriorityQueue implementation for jest-worker #10921

Merged
merged 7 commits into from Dec 26, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/jest-worker/README.md
Expand Up @@ -91,6 +91,14 @@ Provide a custom worker pool to be used for spawning child processes. By default

`jest-worker` will automatically detect if `worker_threads` are available, but will not use them unless passed `enableWorkerThreads: true`.


### `taskQueue`: TaskQueue` (optional)

The task queue defines in which order tasks (method calls) are processed by the workers. `jest-worker` ships with a `FifoQueue` and `PriorityQueue`:

* `FifoQueue` (default): Processes the method calls (tasks) in the call order.
* `PriorityQueue`: Processes the method calls by a computed priority in natural ordering (lower priorities first). Tasks with the same priority are processed in any order (FIFO not guaranteed). The constructor accepts a single argument, the function that is passed the name of the called function and the arguments and returns a numerical value for the priority: `new require('jest-worker').PriorityQueue((method, filename) => filename.length)`.

## JestWorker

### Methods
Expand Down
67 changes: 23 additions & 44 deletions packages/jest-worker/src/Farm.ts
Expand Up @@ -7,6 +7,7 @@

/* eslint-disable local/ban-types-eventually */

import FifoQueue from './FifoQueue';
import {
CHILD_MESSAGE_CALL,
ChildMessage,
Expand All @@ -16,36 +17,34 @@ import {
OnStart,
PromiseWithCustomMessage,
QueueChildMessage,
QueueItem,
TaskQueue,
WorkerInterface,
} from './types';

export default class Farm {
private _computeWorkerKey: FarmOptions['computeWorkerKey'];
private _cacheKeys: Record<string, WorkerInterface>;
private _callback: Function;
private _last: Array<QueueItem>;
private _locks: Array<boolean>;
private _numOfWorkers: number;
private _offset: number;
private _queue: Array<QueueItem | null>;
private _taskQueue: TaskQueue;

constructor(
numOfWorkers: number,
callback: Function,
computeWorkerKey?: FarmOptions['computeWorkerKey'],
options: {
computeWorkerKey?: FarmOptions['computeWorkerKey'];
taskQueue?: TaskQueue;
} = {},
) {
this._cacheKeys = Object.create(null);
this._callback = callback;
this._last = [];
this._locks = [];
this._numOfWorkers = numOfWorkers;
this._offset = 0;
this._queue = [];

if (computeWorkerKey) {
this._computeWorkerKey = computeWorkerKey;
}
this._computeWorkerKey = options.computeWorkerKey;
this._taskQueue = options.taskQueue ?? new FifoQueue();
}

doWork(
Expand Down Expand Up @@ -96,7 +95,8 @@ export default class Farm {
const task = {onCustomMessage, onEnd, onStart, request};

if (worker) {
this._enqueue(task, worker.getWorkerId());
this._taskQueue.enqueue(task, worker.getWorkerId());
this._process(worker.getWorkerId());
} else {
this._push(task);
}
Expand All @@ -108,29 +108,21 @@ export default class Farm {
return promise;
}

private _getNextTask(workerId: number): QueueChildMessage | null {
let queueHead = this._queue[workerId];

while (queueHead && queueHead.task.request[1]) {
queueHead = queueHead.next || null;
}

this._queue[workerId] = queueHead;

return queueHead && queueHead.task;
}

private _process(workerId: number): Farm {
if (this._isLocked(workerId)) {
return this;
}

const task = this._getNextTask(workerId);
const task = this._taskQueue.dequeue(workerId);

if (!task) {
return this;
}

if (task.request[1]) {
throw new Error('Queue implementation returned processed task');
}

const onEnd = (error: Error | null, result: unknown) => {
task.onEnd(error, result);

Expand All @@ -152,28 +144,15 @@ export default class Farm {
return this;
}

private _enqueue(task: QueueChildMessage, workerId: number): Farm {
const item = {next: null, task};

if (task.request[1]) {
return this;
}

if (this._queue[workerId]) {
this._last[workerId].next = item;
} else {
this._queue[workerId] = item;
}

this._last[workerId] = item;
this._process(workerId);

return this;
}

private _push(task: QueueChildMessage): Farm {
this._taskQueue.enqueue(task);

for (let i = 0; i < this._numOfWorkers; i++) {
this._enqueue(task, (this._offset + i) % this._numOfWorkers);
this._process((this._offset + i) % this._numOfWorkers);

if (task.request[1]) {
break;
}
}

this._offset++;
Expand Down
108 changes: 108 additions & 0 deletions packages/jest-worker/src/FifoQueue.ts
@@ -0,0 +1,108 @@
import type {QueueChildMessage, TaskQueue} from './types';

type WorkerQueueValue = {
task: QueueChildMessage;

/**
* The task that was at the top of the shared queue at the time this
* worker specific task was enqueued. Required to maintain FIFO ordering
* across queues. The worker specific task should only be dequeued if the
* previous shared task is null or has been processed.
*/
previousSharedTask: QueueChildMessage | null;
};

/**
* First-in, First-out task queue that manages a dedicated pool
* for each worker as well as a shared queue. The FIFO ordering is guaranteed
* across the worker specific and shared queue.
*/
export default class FifoQueue implements TaskQueue {
private _workerQueues: Array<
InternalQueue<WorkerQueueValue> | undefined
> = [];
private _sharedQueue = new InternalQueue<QueueChildMessage>();

enqueue(task: QueueChildMessage, workerId?: number): void {
if (workerId == null) {
this._sharedQueue.enqueue(task);
return;
}

let workerQueue = this._workerQueues[workerId];
if (workerQueue == null) {
workerQueue = this._workerQueues[
workerId
] = new InternalQueue<WorkerQueueValue>();
}

const sharedTop = this._sharedQueue.peekLast();
const item = {previousSharedTask: sharedTop, task};

workerQueue.enqueue(item);
}

dequeue(workerId: number): QueueChildMessage | null {
const workerTop = this._workerQueues[workerId]?.peek();
const sharedTaskIsProcessed =
workerTop?.previousSharedTask?.request[1] ?? true;

// Process the top task from the shared queue if
// - there's no task in the worker specific queue or
// - if the non-worker-specific task after which this worker specifif task
// hasn been queued wasn't processed yet
if (workerTop != null && sharedTaskIsProcessed) {
return this._workerQueues[workerId]?.dequeue()?.task ?? null;
}

return this._sharedQueue.dequeue();
}
}

type QueueItem<TValue> = {
value: TValue;
next: QueueItem<TValue> | null;
};

/**
* FIFO queue for a single worker / shared queue.
*/
class InternalQueue<TValue> {
private _head: QueueItem<TValue> | null = null;
private _last: QueueItem<TValue> | null = null;

enqueue(value: TValue): void {
const item = {next: null, value};

if (this._last == null) {
this._head = item;
} else {
this._last.next = item;
}

this._last = item;
}

dequeue(): TValue | null {
if (this._head == null) {
return null;
}

const item = this._head;
this._head = item.next;

if (this._head == null) {
this._last = null;
}

return item.value;
}

peek(): TValue | null {
return this._head?.value ?? null;
}

peekLast(): TValue | null {
return this._last?.value ?? null;
}
}