Yaty / workestrator

Workestrator is a Node.js library to distribute tasks to child processes.
https://yaty.github.io/workestrator/
MIT License
3 stars 0 forks source link
cluster farm node nodejs pool typescript workers

Coverage Status Build Status Known Vulnerabilities

Workestrator

Greenkeeper badge

Workestrator is a library to distribute tasks to child processes. It is written in TypeScript and use ES2017 features. This project is highly inspired by node-worker-farm and add some new features. Typescript Documentation : https://yaty.github.io/workestrator/

npm install --save workestrator

Features

  1. Concurrency options
  2. Durability / Resilience : when a call fails it will be re-queued automatically (according to the farm options).
  3. Async/Await support out of the box
  4. Events
  5. Serializers : You can choose among several serializers according to the data types you send to the workers. You can also write your own :)
  6. Broadcasting

Usage

const workestrator = require("workestrator");

const farm = workestrator.create({
    module: "/absolute/path/to/the/worker/module.js",
});

try {
    await farm.run(1, 2, 3); // returns 6
    await farm.runMethod("foo", 1, 2, 3); // returns "bar:1:2:3"
} catch (err) {
    console.log("Oh ... it failed :(", err);
}

And the module is :

module.exports = function(a, b, c) {
    return a + b + c;
};

module.exports.foo = function(a, b, c) {
    return `bar:${a}:${b}:${c}`;
};

Examples

Basic

Running and broadcasting methods.

Approximating π

Doing it the slow (single-process) way...
π ≈ 3.141597687999999   (0.000005034410206050666 away from actual!)
took 12217 milliseconds

Doing it the fast (multi-process) way...
π ≈ 3.1415487919999996  (0.000043861589793525724 away from actual!)
took 3425 milliseconds

See the full example with code here !

API

Workestrator

workestrator.create(options)

Create a new farm.

const workestrator = require("workestrator");

const farm = workestrator.create({
    module: "/absolute/path/to/the/worker/module.js"
});
options

Options is an object, the default values are :

{
    fork: {
        args: process.argv,
        cwd: process.cwd(),
        env: process.env,
        execArgv: process.execArgv.filter((v) => !(/^--(debug|inspect)/).test(v)), // without debug and inspect
        execPath: process.execPath,
        silent: false,
    },
    killTimeout: 500,
    maxConcurrentCalls: Infinity,
    maxConcurrentCallsPerWorker: 10,
    maxIdleTime: Infinity,
    maxRetries: 3,
    numberOfWorkers: require("os").cpus().length,
    serializerPath: workestrator.serializers.JSON
    timeout: Infinity,
    ttl: Infinity,
}

workestrator.kill()

Kill all farms.

workestrator.kill();

Farm

farm.run(...args)

Run the default exported method in your module with arguments. Async. Returns what's returned by the default method.

try {
    const res = await farm.run(1, [], "");
    console.log("Result :", res);
} catch (err) {
    console.log("Oh no :'(", err);
}

farm.runMethod(method, ...args)

Run a specific method in your module. Async. Returns what's returned by the method.

try {
    // bar is a method exported with : module.exports.bar = function ...
    const res = await farm.runMethod("bar", 1, [], "");
    console.log("Result :", res);
} catch (err) {
    console.log("Oh no :'(", err);
}

farm.broadcast(...args)

Run the default exported method in every worker with arguments. Async. Returns an array with the first element being the succeeded calls and the second element the failures.

const [successes, failures] = await farm.broadcast(1, [], "");
console.log("Successes :", successes);
console.log("Failures :", failures);

farm.broadcastMethod(method, ...args)

Run a specific method in every worker. Async. Returns an array with the first element being the succeeded calls and the second element the failures.

// bar is a function exported with : module.exports.bar = function ...
const [successes, failures] = await farm.broadcastMethod("foo", 1, [], "");
console.log("Successes :", successes);
console.log("Failures :", failures);

farm.kill()

Kill the farm.

farm.kill();

Serializers

According to the data you send and receive to your workers you might need to use a different serializer. Data needs to be serialized in order to be sent to the worker process.

Two serializers are available :

  1. JSON (default, Node.js internal serializer), supported types :
    • boolean
    • number (without -0, NaN, and ±Infinity)
    • string
    • Array
    • Object
  2. CBOR, supported types :
    • boolean
    • number (including -0, NaN, and ±Infinity)
    • string
    • Array, Set (encoded as Array)
    • Object (including null), Map
    • undefined
    • Buffer
    • Date,
    • RegExp
    • url.URL (Legacy URL API)
    • bignumber

For performances you might want to check the benchmark.

You can also build your own serializer. You need to extends the Serializer class :

 const {Serializer} = require("workestrator");

class MySerializer extends Serializer {
    encode(data) {
        // do some encoding
        return encodedData;
    }

    decode(data) {
        // do some decoding
        return decodedData;
    }
}

module.exports = MySerializer;

Then set the farm options accordingly :

const farm = workestrator.create({
    module: "/absolute/path/to/the/worker/module.js",
    serializerPath: "/absolute/path/to/MySerializer", // you can use require.resolve to get the absolute path
});

JSON serializer path is in workestrator.serializers.JSON.

CBOR serializer path is in workestrator.serializers.CBOR.

Events

You can listen to events from a worker of from the farm directly. They both extends Node.js EventEmitter. All those events are already being used by Workestrator. You do not have to implement anything, it works out of the box. I wanted to expose events to have the possibility to add logging but I'm sure they are other use-cases :)

Farm

farm.on(event, callback)

online

When a new worker is created within the farm.

farm.on("online", (worker) => {
    // ...
});

message

When a worker is sending a message to the farm.

data looks like this :

{
    callId: number;
    res?: any;
    err?: Error;
    workerId: number;
}
farm.on("message", (worker, data) => {
    // ...
});

disconnect

When a worker disconnect. See Node.js documentation

farm.on("disconnect", (worker) => {
    // ...
});

error

The error event is emitted whenever:

See Node.js documentation

farm.on("error", (worker, err) => {
    // ...
});

close

When a stdio streams of a worker have been closed. See Node.js documentation

farm.on("close", (worker, code, signal) => {
    // ...
});

exit

See Node.js documentation

farm.on("exit", (worker, code, signal) => {
    // ...
});

ttl

When ttl is reached.

farm.on("ttl", (worker) => {
    // ...
});

idle

When maxIdleTime is reached.

farm.on("idle", (worker) => {
    // ...
});

Logging

You can enable logging by using an environment variable : DEBUG=workestrator:*

Why I created this ?

I was looking for a project to use TypeScript, so the idea was to reproduce node-worker-farm and add some new features. It actually went pretty smoothly and I'm happy with the result ;)

License

Workestrator is Copyright (c) 2018 Hugo Da Roit (@Yaty) and licensed under the MIT license. All rights not explicitly granted in the MIT license are reserved. See the included LICENSE file for more details.