Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create an ES module version of workerpool.js #342

Open
josdejong opened this issue Apr 7, 2022 · 1 comment
Open

Create an ES module version of workerpool.js #342

josdejong opened this issue Apr 7, 2022 · 1 comment

Comments

@josdejong
Copy link
Owner

So you can use it via import instead of require and importScripts. Currently, the bundles provided in the dist folder is a CommonJS module.

Help would be welcome!

@brunoAmado
Copy link

brunoAmado commented May 19, 2022

Hi,

I have the same problem. Can you explain the "classic" way to resolve? I have Error: Error: Unknown method "runner" . I tryed the embeddedWorker way with :

import fs from "fs";
import webpack from "webpack";
import btoa from "btoa";
import path from "path";
import { fileURLToPath } from "url";
import workerpool from "workerpool";

const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
//console.log('directory-name 👉️', __dirname);

const WORKER_FILE = "./lib/worker-pool/workers/thread-functions.mjs";
const WORKER_BUNDLE_FILE = "lib/worker-pool/thread-functions.bundle.mjs";
const WORKER_EMBEDDED_FILE =
  "lib/worker-pool/workers/thread-functions.embedded.mjs";

create().then();

async function create() {
  try {
    console.log("\nWorker file " + WORKER_FILE);

    await createWorkerBundle(WORKER_FILE, WORKER_BUNDLE_FILE);
    console.log("\nCreated worker bundle " + WORKER_BUNDLE_FILE);

    createEmbeddedWorkerFromBundle(WORKER_BUNDLE_FILE, WORKER_EMBEDDED_FILE);
    console.log("\nCreated embedded worker " + WORKER_EMBEDDED_FILE);
  } catch (err) {
    console.error(err);
  }
}

/**
 * Generate a bundle of the worker code using Webpack
 * @param {string} workerFile        For example 'worker.js'
 * @param {string} workerBundleFile  For example 'dist/worker.bundle.js'
 */
function createWorkerBundle(workerFile, workerBundleFile) {
  return new Promise(function (resolve, reject) {
    let config = {
      // target: 'node', // makes the worker working for node.js
      entry: workerFile,
      output: {
        filename: workerBundleFile,
        //path: __dirname
      },
      mode: "production",
    };

    webpack(config).run(function (err, stats) {
      if (err) {
        console.log(err);
      }

      process.stdout.write(stats.toString() + "\n");

      if (stats.hasErrors()) {
        reject(
          new Error("Webpack errors:\n" + stats.toJson().errors.join("\n"))
        );
      }

      resolve();
    });
  });
}

/**
 * Create an embedded version of the worker code: a data url.
 * @param {string} workerBundleFile     For example 'dist/worker.bundle.js'
 * @param {string} workerEmbeddedFile   For example 'dist/worker.embedded.js'
 */
function createEmbeddedWorkerFromBundle(workerBundleFile, workerEmbeddedFile) {
  let workerScript = String(
    fs.readFileSync(path.join(__dirname, "../../dist/" + workerBundleFile))
  );

  let workerDataUrl =
    "data:application/javascript;base64, " + btoa(workerScript);
  fs.writeFileSync(
    workerEmbeddedFile,
    'export default "' + workerDataUrl + '";'
  );
}

worker "thread-functions.mjs" :

import workerpool from "workerpool";

importScripts("workerpool");


async function runner(runFunction, classEntities) {

  //const poolProxy = WorkerCon.getPoolProxy()
  //const forks = WorkerCon.getForks()
  console.log("Runner" + process.env.UNIQUE_WORKER_ID);
  console.log(
    `Proxy - Worker Threads Enabled - Min Workers: ${workerpool.pool.minWorkers} - Max Workers: ${workerpool.pool.maxWorkers} - Worker Type: ${workerpool.pool.workerType}`
  );
  let forkId = parseInt(process.env.UNIQUE_WORKER_ID.replace("worker_id_", ""));

  console.log("Runner" + forkId);
  //console.log('Runner forks' + forks)

  const entities = classEntities.chunksIndexToStream.filter(
    (_, index) => index % workerpool.pool.maxWorkers === forkId
  );
  console.log(entities.toString());
  let jsonTempStringifiedParsed = JSON.parse(runFunction);
  for (const entity of entities) {
    console.log(entity.toString());
    classEntities.setData(await jsonTempStringifiedParsed(entity));
  }
  return classEntities;
}

// CREATE WORKERS
workerpool.worker({
  runner: runner,
});

controller.mjs

import workerpool, { pool } from "workerpool";

const numCPU = workerpool.cpus;
const forks = Math.ceil(numCPU / 2) <= 2 ? 2 : Math.ceil(numCPU / 2);
let counter = 0;
let terminatedWorkers = [];

export default function workerController(
  workerUrl,
  workerFunction,
  workerData
) {
  console.log("Number of CPU:" + numCPU);
  console.log("Number of forks:" + forks);
  console.log("DIR________WORKER");
  //console.log(workerUrl);
  let pool = workerpool.pool(workerUrl);
  /*, {
    minWorkers: "max",
    maxWorkers: forks,
    onCreateWorker: (opts) => {
      return {
        ...opts,
        forkOpts: {
          ...opts.forkOpts,
          env: {
            UNIQUE_WORKER_ID: `worker_id_${counter++}`,
          },
        },
      };
    },
    onTerminateWorker: (opts) => {
      terminatedWorkers.push(opts.forkOpts.env.UNIQUE_WORKER_ID);
    },
  });

   */
  console.log(
    `Constructor - Worker Threads Enabled - Min Workers: ${pool.minWorkers} - Max Workers: ${pool.maxWorkers} - Worker Type: ${pool.workerType}`
  );



  let jsonTempStringified = JSON.stringify(workerFunction);


  pool
    .exec("runner", [jsonTempStringified, workerData])
    .then(function (result) {
      log("Result: " + result);
      pool.terminate().then();
    })
    .catch(function (err) {
      log("Error: " + err);
    });

  console.log(
    `Constructor after proxy - Worker Threads Enabled - Min Workers: ${pool.minWorkers} - Max Workers: ${pool.maxWorkers} - Worker Type: ${pool.workerType}`
  );
  return pool;
}

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants