josdejong / workerpool

Offload tasks to a pool of workers on node.js and in the browser
Apache License 2.0
2.1k stars 146 forks source link

workerpool

workerpool offers an easy way to create a pool of workers for both dynamically offloading computations as well as managing a pool of dedicated workers. workerpool basically implements a thread pool pattern. There is a pool of workers to execute tasks. New tasks are put in a queue. A worker executes one task at a time, and once finished, picks a new task from the queue. Workers can be accessed via a natural, promise based proxy, as if they are available straight in the main application.

workerpool runs on Node.js and in the browser.

Features

Why

JavaScript is based upon a single event loop which handles one event at a time. Jeremy Epstein explains this clearly:

In Node.js everything runs in parallel, except your code. What this means is that all I/O code that you write in Node.js is non-blocking, while (conversely) all non-I/O code that you write in Node.js is blocking.

This means that CPU heavy tasks will block other tasks from being executed. In case of a browser environment, the browser will not react to user events like a mouse click while executing a CPU intensive task (the browser "hangs"). In case of a node.js server, the server will not respond to any new request while executing a single, heavy request.

For front-end processes, this is not a desired situation. Therefore, CPU intensive tasks should be offloaded from the main event loop onto dedicated workers. In a browser environment, Web Workers can be used. In node.js, child processes and worker_threads are available. An application should be split in separate, decoupled parts, which can run independent of each other in a parallelized way. Effectively, this results in an architecture which achieves concurrency by means of isolated processes and message passing.

Install

Install via npm:

npm install workerpool

Load

To load workerpool in a node.js application (both main application as well as workers):

const workerpool = require('workerpool');

To load workerpool in the browser:

<script src="https://github.com/josdejong/workerpool/raw/master/workerpool.js"></script>

To load workerpool in a web worker in the browser:

importScripts('workerpool.js');

Setting up the workerpool with React or webpack5 requires additional configuration steps, as outlined in the webpack5 section.

Use

Offload functions dynamically

In the following example there is a function add, which is offloaded dynamically to a worker to be executed for a given set of arguments.

myApp.js

const workerpool = require('workerpool');
const pool = workerpool.pool();

function add(a, b) {
  return a + b;
}

pool
  .exec(add, [3, 4])
  .then(function (result) {
    console.log('result', result); // outputs 7
  })
  .catch(function (err) {
    console.error(err);
  })
  .then(function () {
    pool.terminate(); // terminate all workers when done
  });

Note that both function and arguments must be static and stringifiable, as they need to be sent to the worker in a serialized form. In case of large functions or function arguments, the overhead of sending the data to the worker can be significant.

Dedicated workers

A dedicated worker can be created in a separate script, and then used via a worker pool.

myWorker.js

const workerpool = require('workerpool');

// a deliberately inefficient implementation of the fibonacci sequence
function fibonacci(n) {
  if (n < 2) return n;
  return fibonacci(n - 2) + fibonacci(n - 1);
}

// create a worker and register public functions
workerpool.worker({
  fibonacci: fibonacci,
});

This worker can be used by a worker pool:

myApp.js

const workerpool = require('workerpool');

// create a worker pool using an external worker script
const pool = workerpool.pool(__dirname + '/myWorker.js');

// run registered functions on the worker via exec
pool
  .exec('fibonacci', [10])
  .then(function (result) {
    console.log('Result: ' + result); // outputs 55
  })
  .catch(function (err) {
    console.error(err);
  })
  .then(function () {
    pool.terminate(); // terminate all workers when done
  });

// or run registered functions on the worker via a proxy:
pool
  .proxy()
  .then(function (worker) {
    return worker.fibonacci(10);
  })
  .then(function (result) {
    console.log('Result: ' + result); // outputs 55
  })
  .catch(function (err) {
    console.error(err);
  })
  .then(function () {
    pool.terminate(); // terminate all workers when done
  });

Worker can also initialize asynchronously:

myAsyncWorker.js

define(['workerpool/dist/workerpool'], function (workerpool) {
  // a deliberately inefficient implementation of the fibonacci sequence
  function fibonacci(n) {
    if (n < 2) return n;
    return fibonacci(n - 2) + fibonacci(n - 1);
  }

  // create a worker and register public functions
  workerpool.worker({
    fibonacci: fibonacci,
  });
});

Examples

Examples are available in the examples directory:

https://github.com/josdejong/workerpool/tree/master/examples

API

The API of workerpool consists of two parts: a function workerpool.pool to create a worker pool, and a function workerpool.worker to create a worker.

pool

A workerpool can be created using the function workerpool.pool:

workerpool.pool([script: string] [, options: Object]) : Pool

When a script argument is provided, the provided script will be started as a dedicated worker. When no script argument is provided, a default worker is started which can be used to offload functions dynamically via Pool.exec. Note that on node.js, script must be an absolute file path like __dirname + '/myWorker.js'. In a browser environment, script can also be a data URL like 'data:application/javascript;base64,...'. This allows embedding the bundled code of a worker in your main application. See examples/embeddedWorker for a demo.

The following options are available:

Important note on 'workerType': when sending and receiving primitive data types (plain JSON) from and to a worker, the different worker types ('web', 'process', 'thread') can be used interchangeably. However, when using more advanced data types like buffers, the API and returned results can vary. In these cases, it is best not to use the 'auto' setting but have a fixed 'workerType' and good unit testing in place.

A worker pool contains the following functions:

The function Pool.exec and the proxy functions all return a Promise. The promise has the following functions available:

Example usage:

const workerpool = require('workerpool');

function add(a, b) {
  return a + b;
}

const pool1 = workerpool.pool();

// offload a function to a worker
pool1
  .exec(add, [2, 4])
  .then(function (result) {
    console.log(result); // will output 6
  })
  .catch(function (err) {
    console.error(err);
  });

// create a dedicated worker
const pool2 = workerpool.pool(__dirname + '/myWorker.js');

// supposed myWorker.js contains a function 'fibonacci'
pool2
  .exec('fibonacci', [10])
  .then(function (result) {
    console.log(result); // will output 55
  })
  .catch(function (err) {
    console.error(err);
  });

// send a transferable object to the worker
// supposed myWorker.js contains a function 'sum'
const toTransfer = new Uint8Array(2).map((_v, i) => i)
pool2
  .exec('sum', [toTransfer], { transfer: [toTransfer.buffer] })
  .then(function (result) {
    console.log(result); // will output 3
  })
  .catch(function (err) {
    console.error(err);
  });

// create a proxy to myWorker.js
pool2
  .proxy()
  .then(function (myWorker) {
    return myWorker.fibonacci(10);
  })
  .then(function (result) {
    console.log(result); // will output 55
  })
  .catch(function (err) {
    console.error(err);
  });

// create a pool with a specified maximum number of workers
const pool3 = workerpool.pool({ maxWorkers: 7 });

worker

A worker is constructed as:

workerpool.worker([methods: Object<String, Function>] [, options: Object]) : void

Argument methods is optional and can be an object with functions available in the worker. Registered functions will be available via the worker pool.

The following options are available:

Example usage:

// file myWorker.js
const workerpool = require('workerpool');

function add(a, b) {
  return a + b;
}

function multiply(a, b) {
  return a * b;
}

// create a worker and register functions
workerpool.worker({
  add: add,
  multiply: multiply,
});

Asynchronous results can be handled by returning a Promise from a function in the worker:

// file myWorker.js
const workerpool = require('workerpool');

function timeout(delay) {
  return new Promise(function (resolve, reject) {
    setTimeout(resolve, delay);
  });
}

// create a worker and register functions
workerpool.worker({
  timeout: timeout,
});

Transferable objects can be sent back to the pool using Transfer helper class:

// file myWorker.js
const workerpool = require('workerpool');

function array(size) {
  var array = new Uint8Array(size).map((_v, i) => i);
  return new workerpool.Transfer(array, [array.buffer]);
}

// create a worker and register functions
workerpool.worker({
  array: array,
});

Events

You can send data back from workers to the pool while the task is being executed using the workerEmit function:

workerEmit(payload: any) : unknown

This function only works inside a worker and during a task.

Example:

// file myWorker.js
const workerpool = require('workerpool');

function eventExample(delay) {
  workerpool.workerEmit({
    status: 'in_progress',
  });

  workerpool.workerEmit({
    status: 'complete',
  });

  return true;
}

// create a worker and register functions
workerpool.worker({
  eventExample: eventExample,
});

To receive those events, you can use the on option of the pool exec method:

pool.exec('eventExample', [], {
  on: function (payload) {
    if (payload.status === 'in_progress') {
      console.log('In progress...');
    } else if (payload.status === 'complete') {
      console.log('Done!');
    }
  },
});

Worker API

Workers have access to a worker api which contains the following methods

Worker termination may be recoverable through abort listeners which are registered through worker.addAbortListener. If all registered listeners resolve then the worker will not be terminated, allowing for worker reuse in some cases.

NOTE: For operations to successfully clean up, a worker implementation should be async. If the worker thread is blocked, then the worker will be killed.

function asyncTimeout() {
  var me = this;
  return new Promise(function (resolve) {
    let timeout = setTimeout(() => {
        resolve();
    }, 5000); 

    // Register a listener which will resolve before the time out
    // above triggers.
    me.worker.addAbortListener(async function () {
        clearTimeout(timeout);
        resolve();
    });
  });
}

// create a worker and register public functions
workerpool.worker(
  {
    asyncTimeout: asyncTimeout,
  },
  {
    abortListenerTimeout: 1000
  }
);

Events may also be emitted from the worker api through worker.emit

// file myWorker.js
const workerpool = require('workerpool');

function eventExample(delay) {
  this.worker.emit({
    status: "in_progress",
  });
  workerpool.workerEmit({
    status: 'complete',
  });

  return true;
}

// create a worker and register functions
workerpool.worker({
  eventExample: eventExample,
});

Utilities

Following properties are available for convenience:

Roadmap

Related libraries

Build

First clone the project from github:

git clone git://github.com/josdejong/workerpool.git
cd workerpool

Install the project dependencies:

npm install

Then, the project can be build by executing the build script via npm:

npm run build

This will build the library workerpool.js and workerpool.min.js from the source files and put them in the folder dist.

Test

To execute tests for the library, install the project dependencies once:

npm install

Then, the tests can be executed:

npm test

To test code coverage of the tests:

npm run coverage

To see the coverage results, open the generated report in your browser:

./coverage/index.html

Publish

License

Copyright (C) 2014-2024 Jos de Jong wjosdejong@gmail.com

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.