breejs / bree

Bree is a Node.js and JavaScript job task scheduler with worker threads, cron, Date, and human syntax. Built for @ladjs, @forwardemail, @spamscanner, @cabinjs.
https://jobscheduler.net
MIT License
3.01k stars 78 forks source link

how to share data between worker threads #129

Closed ocylib closed 2 years ago

ocylib commented 2 years ago

I'd like to do something like this: worker thread 1(T1), runs every 30 minutes and update data worker thread 2(T2), runs every 10 seconds and do something base on data from T1

what tried Section one in worker file of T1 use parentPort.postMessage to post the data to the main thread, and then workerMessageHandler, to handler the data in the main file. finally, post the data back to every worker thread as follow:

const ws = bree.workers;
for (const key in ws) {
 if (Object.prototype.hasOwnProperty.call(ws, key)) {
     const w = ws[key as keyof object] as Worker;
     w.postMessage(data);
  }
}

it works, but not perfect. worker thread get the data from the main thread sometimes while sometimes not

Section two share progress.evn between the main thread and all worker thread as custom worker and option I tried to set a worker option like this

import { SHARE_ENV } from 'worker_threads'
{
...
worker: { workerData: { __filename: job.path },env: SHARE_ENV},
}
...

I was told that SHARE_ENV can't be clone, failed thanks for any help

shadowgate15 commented 2 years ago

you could try using MessageChannel.

and then send the ports via postMessage after the workers are created this would then allow to set up direct channels between the two jobs.

ocylib commented 2 years ago

@shadowgate15 thanks,MessageChannel works in the native nodejs worker thread.

file:index.js
-------------------------------------------------
const { Worker } = require('worker_threads');
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
const w = new Worker('./jobs/job1.js');
const w2 = new Worker('./jobs/job2.js');
w.postMessage({ p: port1 }, [port1]);
w2.postMessage({ p: port2 }, [port2]);
file: ./jobs/job1.js
------------------------------------------------
const { parentPort } = require('worker_threads');
if (parentPort) {
parentPort.on('message', (msg) => {
  const { p } = msg;
  setInterval(() => {
    p.postMessage(`time:${Date.now()}`);
  }, 500);
  p.on('message', (v) => {
    console.log(v);
  });
  });
}
file: ./jobs/job2.js
------------------------------------------------
const { parentPort } = require('worker_threads');
if (parentPort) {
  parentPort.on('message', (msg) => {
    const { p } = msg;
    p.on('message', (v) => {
      console.log('on messsage in job2', v);
  });
    setInterval(() => {
      p.postMessage('message from job2');
    }, 1000);
  });
}

However, it didn't work when I try to use this with Bree. how and when to post ports to the job worker thread? can you show me an example, pls

shadowgate15 commented 2 years ago

inside of index.js pass the port once the worker is created and online.

bree.on('worker created', (jobName) => {
  if (jobName === targetWorkerOne) {
    bree.workers[jobName].postMessage({ p: port1 });
  } else if (jobName === targetWorkerTwo) {
    bree.workers[jobName].postMessage({ p: port2 });
  }
});

This should send the MessagePort on to the worker.

ocylib commented 2 years ago

@shadowgate15 thanks, it works. what you said is just what I did before. I said it did not work, because there were codes like this in my job file.

parentPort.postMessage('done')

it made the worker thread close before the parentPort.on('message',...) ran, so I could not get the port from the main thread in the worker thread

I use a promise to wait for parentPort's message event to happen, when I get the port from the main thread I save it and use it