josdejong / workerpool

Offload tasks to a pool of workers on node.js and in the browser
Apache License 2.0
2.06k stars 147 forks source link

Send event / message to running Worker? #370

Closed flipswitchingmonkey closed 1 year ago

flipswitchingmonkey commented 1 year ago

The backchannel from the worker back to the main thread via emit works fine, but are there ports set up the other way around (sending a message into the worker?)

Reason: I need to send updated data to a running process inside a worker.

flipswitchingmonkey commented 1 year ago

I guess it was the second part of this PR that was never implemented? :( https://github.com/josdejong/workerpool/pull/210

josdejong commented 1 year ago

Thanks for your suggestion Michael. Yes there was a start with something like this in #210, but we still have to think through a good API for this.

Help would be welcome!

flipswitchingmonkey commented 1 year ago

Hm. Just spitballing here... how about a new callback, similar to on() events. Something like

let executionId: number | undefined;
pool.exec('eventExample', [], {
  onExecutionStart: (id) => {
     executionId = id;
  },
  on: function (payload) {
   //...
  },
});

...

if (executionId) {
  pool.getWorkerInstance(executionId).emit('Take this!', {data: {...}});
}
josdejong commented 1 year ago

Yes that could work, though it involves an indirect solution via an identifier.

How about extending the returned Promise-like instance:

const handler = pool.exec('fibonacci', [10], {
  on: function (payload) { ... }
}

// the returned promise-like `handler` actually is an extended version of Promise, 
// having methods .then(callback), .catch(callback), .cancel(), .timeout(delay), and .emit(payload)
promise.emit('Take this!')

const result = await handler

To prevent confusion, we should maybe rename Promise to ExecutionHandler or something like that.

flipswitchingmonkey commented 1 year ago

That'd be much nicer of course!

Also, it'd be nice if the messages would be queued up as well, in case the worker has not started yet (and alternatively, the promise should also return the current state of the task (queued, running, finished,...) so messages don't get lost)

Adding this would make the pool so much more useful for controlling long running tasks!

josdejong commented 1 year ago

Yes, it will be nice to be able to have two way messaging between the main process and the worker.

I'm not sure if it is possible to queue messages upfront: upfront you do not know at which worker a task will be assigned, only as soon as the task started you can send something to the worker where the task is running I think?

Maybe you can share your use-case to better understand what a good API would be to cater for it?

flipswitchingmonkey commented 1 year ago

It'd be enough if the promise returned the state so that only in case it is running, a message could be sent.

if (handler.status() === 'running') handler.emit('Take this!');

The use case here is to run a long running task, which in itself is a workflow of multiple steps. These could take a while, so on the one hand we need to be able to timeout and cancel the task (which ideally requires some extra steps for a clean shutdown, other than just killing the task), and on the other hand we'd like to trigger actions on the running task as well (like adding a new step while it is running).

It's all rather convoluted, tbh., but it requires a channel back into the worker, in any case :)

josdejong commented 1 year ago

Thanks. Makes sense.

I think the the handler.emit could queue up any messages send before the task is started, making it optional to check whether the status is running before sending something.

flipswitchingmonkey commented 1 year ago

Sounds like a plan! Let me know if I can help out somehow.

josdejong commented 1 year ago

I'm not planning on implementing this myself, so if it is important to you or someone else have to jump in.

flipswitchingmonkey commented 1 year ago

I've created a PR with a relatively simple change to let the parent send messages to the worker instances parallel to the current execution.