Skip to content

Commit

Permalink
Make shared workers available in worker threads only
Browse files Browse the repository at this point in the history
Now that AVA runs test files in worker threads, it makes sense for _shared_ workers to only be available in that execution mode. This makes for more efficient communication and will enable us to transfer byte arrays, message ports and even share memory.

Separately this commit fixes how shared worker errors are reported.

Co-authored-by: Mark Wubben <mark@novemberborn.net>
  • Loading branch information
dnlup and novemberborn committed Apr 6, 2021
1 parent eef1a55 commit bdf2cf0
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 195 deletions.
60 changes: 9 additions & 51 deletions lib/fork.js
Expand Up @@ -6,49 +6,6 @@ const {controlFlow} = require('./ipc-flow-control');

const WORKER_PATH = require.resolve('./worker/base.js');

class SharedWorkerChannel extends Emittery {
constructor({channelId, filename, initialData}, sendToFork) {
super();

this.id = channelId;
this.filename = filename;
this.initialData = initialData;
this.sendToFork = sendToFork;
}

signalReady() {
this.sendToFork({
type: 'shared-worker-ready',
channelId: this.id
});
}

signalError() {
this.sendToFork({
type: 'shared-worker-error',
channelId: this.id
});
}

emitMessage({messageId, replyTo, serializedData}) {
this.emit('message', {
messageId,
replyTo,
serializedData
});
}

forwardMessageToFork({messageId, replyTo, serializedData}) {
this.sendToFork({
type: 'shared-worker-message',
channelId: this.id,
messageId,
replyTo,
serializedData
});
}
}

let forkCounter = 0;

const createWorker = (options, execArgv) => {
Expand Down Expand Up @@ -97,7 +54,6 @@ const createWorker = (options, execArgv) => {
module.exports = (file, options, execArgv = process.execArgv) => {
// TODO: this can be changed to use `threadId` when using worker_threads
const forkId = `fork/${++forkCounter}`;
const sharedWorkerChannels = new Map();

let finished = false;

Expand Down Expand Up @@ -146,17 +102,19 @@ module.exports = (file, options, execArgv = process.execArgv) => {
case 'ready-for-options':
send({type: 'options', options});
break;

case 'shared-worker-connect': {
const channel = new SharedWorkerChannel(message.ava, send);
sharedWorkerChannels.set(channel.id, channel);
emitter.emit('connectSharedWorker', channel);
const {channelId, filename, initialData, port} = message.ava;
emitter.emit('connectSharedWorker', {
filename,
initialData,
port,
signalError() {
send({type: 'shared-worker-error', channelId});
}
});
break;
}

case 'shared-worker-message':
sharedWorkerChannels.get(message.ava.channelId).emitMessage(message.ava);
break;
case 'ping':
send({type: 'pong'});
break;
Expand Down
50 changes: 29 additions & 21 deletions lib/plugin-support/shared-worker-loader.js
@@ -1,20 +1,28 @@
const {EventEmitter, on} = require('events');
const v8 = require('v8');
const {workerData, parentPort} = require('worker_threads');
const pkg = require('../../package.json');

// Used to forward messages received over the `parentPort`. Every subscription
// adds a listener, so do not enforce any maximums.
// Used to forward messages received over the `parentPort` and any direct ports
// to test workers. Every subscription adds a listener, so do not enforce any
// maximums.
const events = new EventEmitter().setMaxListeners(0);
const emitMessage = message => {
// Wait for a turn of the event loop, to allow new subscriptions to be
// set up in response to the previous message.
setImmediate(() => events.emit('message', message));
};

// Map of active test workers, used in receiveMessages() to get a reference to
// the TestWorker instance, and relevant release functions.
const activeTestWorkers = new Map();

const internalMessagePort = Symbol('Internal MessagePort');

class TestWorker {
constructor(id, file) {
constructor(id, file, port) {
this.id = id;
this.file = file;
this[internalMessagePort] = port;
}

teardown(fn) {
Expand Down Expand Up @@ -47,10 +55,10 @@ class TestWorker {
}

class ReceivedMessage {
constructor(testWorker, id, serializedData) {
constructor(testWorker, id, data) {
this.testWorker = testWorker;
this.id = id;
this.data = v8.deserialize(new Uint8Array(serializedData));
this.data = data;
}

reply(data) {
Expand Down Expand Up @@ -98,7 +106,7 @@ async function * receiveMessages(fromTestWorker, replyTo) {

let received = messageCache.get(message);
if (received === undefined) {
received = new ReceivedMessage(active.instance, message.messageId, message.serializedData);
received = new ReceivedMessage(active.instance, message.messageId, message.data);
messageCache.set(message, received);
}

Expand All @@ -112,11 +120,10 @@ const nextMessageId = () => `${messageIdPrefix}/${++messageCounter}`;

function publishMessage(testWorker, data, replyTo) {
const id = nextMessageId();
parentPort.postMessage({
testWorker[internalMessagePort].postMessage({
type: 'message',
messageId: id,
testWorkerId: testWorker.id,
serializedData: [...v8.serialize(data)],
data,
replyTo
});

Expand All @@ -130,11 +137,13 @@ function publishMessage(testWorker, data, replyTo) {

function broadcastMessage(data) {
const id = nextMessageId();
parentPort.postMessage({
type: 'broadcast',
messageId: id,
serializedData: [...v8.serialize(data)]
});
for (const trackedWorker of activeTestWorkers.values()) {
trackedWorker.instance[internalMessagePort].postMessage({
type: 'message',
messageId: id,
data
});
}

return {
id,
Expand Down Expand Up @@ -184,12 +193,13 @@ loadFactory(workerData.filename).then(factory => {

parentPort.on('message', async message => {
if (message.type === 'register-test-worker') {
const {id, file} = message;
const instance = new TestWorker(id, file);
const {id, file, port} = message;
const instance = new TestWorker(id, file, port);

activeTestWorkers.set(id, {instance, teardownFns: new Set()});

produceTestWorker(instance);
port.on('message', message => emitMessage({testWorkerId: id, ...message}));
}

if (message.type === 'deregister-test-worker') {
Expand All @@ -207,11 +217,9 @@ loadFactory(workerData.filename).then(factory => {
type: 'deregistered-test-worker',
id
});
}

// Wait for a turn of the event loop, to allow new subscriptions to be
// set up in response to the previous message.
setImmediate(() => events.emit('message', message));
emitMessage(message);
}
});

return {
Expand Down
37 changes: 11 additions & 26 deletions lib/plugin-support/shared-workers.js
@@ -1,6 +1,5 @@
const events = require('events');
const {Worker} = require('worker_threads');

const serializeError = require('../serialize-error');

const LOADER = require.resolve('./shared-worker-loader');
Expand All @@ -16,11 +15,12 @@ const waitForAvailable = async worker => {
}
};

function launchWorker({filename, initialData}) {
function launchWorker(filename, initialData) {
if (launchedWorkers.has(filename)) {
return launchedWorkers.get(filename);
}

// TODO: remove the custom id and use the built-in thread id.
const id = `shared-worker/${++sharedWorkerCounter}`;
const worker = new Worker(LOADER, {
// Ensure the worker crashes for unhandled rejections, rather than allowing undefined behavior.
Expand Down Expand Up @@ -63,25 +63,10 @@ async function observeWorkerProcess(fork, runStatus) {
}
});

fork.onConnectSharedWorker(async channel => {
const launched = launchWorker(channel);

const handleChannelMessage = ({messageId, replyTo, serializedData}) => {
launched.worker.postMessage({
type: 'message',
testWorkerId: fork.forkId,
messageId,
replyTo,
serializedData
});
};
fork.onConnectSharedWorker(async ({filename, initialData, port, signalError}) => {
const launched = launchWorker(filename, initialData);

const handleWorkerMessage = async message => {
if (message.type === 'broadcast' || (message.type === 'message' && message.testWorkerId === fork.forkId)) {
const {messageId, replyTo, serializedData} = message;
channel.forwardMessageToFork({messageId, replyTo, serializedData});
}

if (message.type === 'deregistered-test-worker' && message.id === fork.forkId) {
launched.worker.off('message', handleWorkerMessage);

Expand All @@ -96,31 +81,31 @@ async function observeWorkerProcess(fork, runStatus) {
signalDeregistered();
launched.worker.off('message', handleWorkerMessage);
runStatus.emitStateChange({type: 'shared-worker-error', err: serializeError('Shared worker error', true, error)});
channel.signalError();
signalError();
});

try {
await launched.statePromises.available;

registrationCount++;

port.postMessage({type: 'ready'});

launched.worker.postMessage({
type: 'register-test-worker',
id: fork.forkId,
file: fork.file
});
file: fork.file,
port
}, [port]);

fork.promise.finally(() => {
launched.worker.postMessage({
type: 'deregister-test-worker',
id: fork.forkId
});

channel.off('message', handleChannelMessage);
});

launched.worker.on('message', handleWorkerMessage);
channel.on('message', handleChannelMessage);
channel.signalReady();
} catch {
return;
} finally {
Expand Down
2 changes: 1 addition & 1 deletion lib/reporters/default.js
Expand Up @@ -772,7 +772,7 @@ class Reporter {
for (const evt of this.sharedWorkerErrors) {
this.lineWriter.writeLine(colors.error(`${figures.cross} Error in shared worker`));
this.lineWriter.writeLine();
this.writeErr(evt.err);
this.writeErr(evt);
if (evt !== last || writeTrailingLines) {
this.lineWriter.writeLine();
this.lineWriter.writeLine();
Expand Down

0 comments on commit bdf2cf0

Please sign in to comment.