josdejong / workerpool

Offload tasks to a pool of workers on node.js and in the browser
Apache License 2.0
2.09k stars 146 forks source link

Sharing complex objects across worker threads? #323

Open mayurk opened 3 years ago

mayurk commented 3 years ago

Is there a way to share DB pool object across the worker threads. Any attempts to send such object to the worker thread from the main thread via the pool.exec(...) call gives a DataCloneError since that object has a function reference. Also even though the worker threads are part of the same process ( i might be wrong here ) every time we "require" a module in the worker it gets a fresh copy instead getting a same copy from cache. This means we cannot use this module where DB connectivity is required.

shlomiah commented 3 years ago

+1 I try to use variables that was required in the main thread and don't know how to inject them into the worker pool.

josdejong commented 3 years ago

@mayurk worker threads are really isolated from each other, just search some more explanation about it on the internet if you want to know more.

@shlomiah I'm not sure if your remark is about the same topic. Can you explain your use case?

jfoclpf commented 2 years ago

@josdejong there's no way to use some sort of SharedArrayBuffer?

I need to share (not clone, not transfer, just share) a big heavy JS object created at the parent thread, and share it amongst children threads.

jfoclpf commented 2 years ago

@josdejong I know this does not work, but something like this:

const workerpool = require('workerpool');
const pool = workerpool.pool();

const sharedObj = {a: 1, b: 2}

function add(a, b) {
  return a + b + sharedObj.a + sharedObj.b;
}

for (let i=0; i < 10; i++) {
  pool.exec(add, [i, i+1])
    .then(function (result) {
      console.log('result', result); // outputs 7
    })
    .catch(function (err) {
      console.error(err);
    })
    .then(function () {
      pool.terminate(); // terminate all workers when done
    });
}
josdejong commented 2 years ago

Sounds like you would like to use a SharedWorker. The workerpool library doesn't support that though.

And on a side note: options to transfer an object without copying are discussed in #3.

jfoclpf commented 2 years ago

@josdejong do you recommend me any npm library for nodeJs?

josdejong commented 2 years ago

I don't know, it depends on your use case. I'm sure you can find something or can just use native API's.

normancapule commented 1 year ago

@josdejong there's no way to use some sort of SharedArrayBuffer?

I need to share (not clone, not transfer, just share) a big heavy JS object created at the parent thread, and share it amongst children threads.

In my case I had to pass an image buffer from main node process to the worker processes. I used base64 encoding to share the buffer object

Mae6e commented 10 months ago

@josdejong there's no way to use some sort of SharedArrayBuffer? I need to share (not clone, not transfer, just share) a big heavy JS object created at the parent thread, and share it amongst children threads.

In my case I had to pass an image buffer from main node process to the worker processes. I used base64 encoding to share the buffer object

hi. can you write a sample for that here?

pcace commented 3 months ago

In my case I had to pass an image buffer from main node process to the worker processes. I used base64 encoding to share the buffer object

i`d be interested too! Any help on this would be great!

Wakatem commented 2 months ago

This is an example of the approach @normancapule mentioned

Main Thread

const { Worker } = require('worker_threads');

class TestClass {
    constructor(data) {
        this.data = data;
    }

    increment() {
        this.data.value += 1;
    }

    // Method to serialize the object to Base64
    toBase64() {
        const jsonString = JSON.stringify(this);
        return Buffer.from(jsonString).toString('base64');
    }

    // Static method to deserialize a Base64 string back to an object
    static fromBase64(base64String) {
        const jsonString = Buffer.from(base64String, 'base64').toString('utf-8');
        const jsonObject = JSON.parse(jsonString);
        return new TestClass(jsonObject.data);
    }
}

// Define the outer object containing an inner object
var outerObject = new TestClass({ value: 0 });

// Create a new worker
const worker = new Worker('./worker.js');

// Listen for messages from the worker
worker.on('message', (message) => {
    // Deserialize the outer object from Base64
    const updatedOuterObject = TestClass.fromBase64(message);
    originalMapFacade = ModifiedMapFacade;
    console.log('Received message from worker:', updatedOuterObject);
    console.log('Updated inner object value:', updatedOuterObject.data.value); // Should be 1 after worker increments it
});

// Serialize the outer object to Base64 and send it to the worker
worker.postMessage(outerObject.toBase64());

Worker

const { parentPort } = require('worker_threads');

// Define the class in the worker
class TestClass {
    constructor(data) {
        this.data = data;
    }

    increment() {
        this.data.value += 1;
    }

    // Static method to deserialize a Base64 string back to an object
    static fromBase64(base64String) {
        const jsonString = Buffer.from(base64String, 'base64').toString('utf-8');
        const jsonObject = JSON.parse(jsonString);
        return new TestClass(jsonObject.data);
    }

    // Method to serialize the object to Base64
    toBase64() {
        const jsonString = JSON.stringify(this);
        return Buffer.from(jsonString).toString('base64');
    }
}

// Listen for messages from the main thread
parentPort.on('message', (message) => {
    // Deserialize the outer object from Base64
    const outerObject = TestClass.fromBase64(message);

    // Perform some work (e.g., incrementing the value inside the inner object)
    outerObject.increment();

    // Send the updated outer object back to the main thread
    parentPort.postMessage(outerObject.toBase64());
});