chjj / bthreads

worker threads for javascript
Other
48 stars 0 forks source link

bthreads

A worker_threads wrapper for node.js. Provides transparent fallback for pre-11.7.0 node.js (via child_process) as well as browser web workers. Browserifiable, webpack-able.

Usage

const threads = require('bthreads');

if (threads.isMainThread) {
  const worker = new threads.Worker(__filename, {
    workerData: 'foo'
  });

  worker.on('message', console.log);
  worker.on('error', console.error);

  worker.on('exit', (code) => {
    if (code !== 0)
      console.error(`Worker stopped with exit code ${code}.`);
  });
} else {
  threads.parentPort.postMessage(threads.workerData + 'bar');
}

Output (with node@<11.7.0):

$ node --experimental-worker threads.js
foobar
$ node threads.js
foobar

Backends

bthreads has 4 backends and a few layers of fallback:

The current backend is exposed as threads.backend. Note that the current backend can be set with the BTHREADS_BACKEND environment variable.

Explicit Entry Points

require('bthreads') will automatically pick the backend depending on what is stable, but in some cases that may not be what you want. Because of this, there are also more explicit entry points:

Caveats

Some caveats for the child_process backend:

Caveats for the web_workers backend:

Caveats for the polyfill backend:

Caveats for all of the above:

Finally, caveats for the worker_threads backend:

High-level API

The low-level node.js API is not very useful on its own. bthreads optionally provides an API similar to bsock.

Example (for brevity, the async wrapper is not included below):

const threads = require('bthreads');

if (threads.isMainThread) {
  const thread = new threads.Thread(__filename);

  thread.bind('event', (x, y) => {
    console.log(x + y);
  });

  console.log(await thread.call('job', ['hello']));
} else {
  const {parent} = threads;

  parent.hook('job', async (arg) => {
    return arg + ' world';
  });

  parent.fire('event', ['foo', 'bar']);
}

Output:

foobar
hello world

Creating a thread pool

You may find yourself wanting to parallelize the same worker jobs. The high-level API offers a thread pool object (threads.Pool) which will automatically load balance and scale to the number of CPU cores.

if (threads.isMainThread) {
  const pool = new threads.Pool(__filename);

  const results = await Promise.all([
    pool.call('job1'), // Runs on thread 1.
    pool.call('job2'), // Runs on thread 2.
    pool.call('job3')  // Runs on thread 3.
  ]);

  console.log(results);
} else {
  const {parent} = threads;

  Buffer.poolSize = 1; // Make buffers easily transferrable.

  parent.hook('job1', async () => {
    const buf = Buffer.from('job1 result');
    return [buf, [buf.buffer]]; // Transfer the array buffer.
  });

  parent.hook('job2', async () => {
    return 'job2 result';
  });

  parent.hook('job3', async () => {
    return 'job3 result';
  });
}

Writing code for node and the browser

One of the remarkable features of bthreads is that it allows for static analysis when bundling. The threads.Pool and threads.Thread objects resolve their filename argument as if it was a require() from the calling file.

const thread = new threads.Thread('./worker.js');

The above line will resolve to ${__dirname}/worker.js in node.js and ${window.location}/worker.js in the browser. In node.js, it is not relative to the current working directory! We accomplish this through various forms of sorcery.

Why does this matter? Because it allows for browserify and/or webpack to do static analysis on your code and ship your code (including workers) as a single bundled file! Of course, this would require an extra browserify or webpack plugin which adds some more initialization code for choosing the proper entry point.

How this works behind the scenes (for plugin implementers)

Statically analyzing the line above, the compiler should replace './worker.js' with 'bthreads-worker@[id]'. When initializing the code, bthreads should be implicitly required. bthreads will set an environment variable called process.env.BTHREADS_WORKER_INLINE which contains the [id] you generated previously, allowing you to determine which function to run inside the worker thread.

In other words, when the compiler comes across:

const thread = new threads.Thread('./worker.js');

./worker.js should be included in the bundled and mapped to an ID (in our case, we include it in the bundle with an ID of 1).

Our line becomes:

const thread = new threads.Thread('bthreads-worker@1');

The bundle's main entry point should include some initialization code like:

requireBthreads();

if (process.env.BTHREADS_WORKER_INLINE)
  requireWorker(process.env.BTHREADS_WORKER_INLINE);
else
  requireMain();

importScripts

In the browser, bthreads exposes a more useful version of importScripts called threads.require.

const threads = require('bthreads');
const _ = threads.require('https://unpkg.com/underscore/underscore.js');

This should work for any library exposed as UMD or CommonJS. Note that threads.require behaves more like require in that it caches modules by URL.

More about eval'd browser code

Note that if you are eval'ing some code inside a script you plan to bundle with browserify or webpack, require may get unintentionally transformed or overridden. This generally happens when you are calling toString on a defined function.

const threads = require('bthreads');

function myWorker() {
  const threads = require('bthreads');

  threads.parentPort.postMessage('foo');
}

const code = `(${myWorker})();`;
const worker = new threads.Worker(code, { eval: true });

The solution is to access module.require instead of require.

const threads = require('bthreads');

function myWorker() {
  const threads = module.require('bthreads');

  threads.parentPort.postMessage('foo');
}

const code = `(${myWorker})();`;
const worker = new threads.Worker(code, { eval: true });

API

Socket Class (abstract, extends EventEmitter)

Thread Class (extends Socket)

Port Class (extends Socket)

Channel Class

Pool Class (extends EventEmitter)

Thread, Pool, and Worker Options

The options object accepted by the Thread, Pool, and Worker classes is nearly identical to the worker_threads worker options with some differences:

Worker Data

In the browser, workerData is serialized as JSON instead of structured data. To force usage of the structured clone algorithm, it's possible to require ./lib/encoding (note that this will increase your code size greatly).

const encoding = require('bthreads/encoding');
const thread = new threads.Thread('./worker.js', {
  workerData: encoding.stringify({ foo: 'bar' })
});

Contribution and License Agreement

If you contribute code to this project, you are implicitly allowing your code to be distributed under the MIT license. You are also implicitly verifying that all code is your original work. </legalese>

License

See LICENSE for more info.