josdejong / workerpool

Offload tasks to a pool of workers on node.js and in the browser
Apache License 2.0
2.04k stars 148 forks source link

Feat/abort listener #448

Open joshLong145 opened 2 months ago

joshLong145 commented 2 months ago

Adds an abortListener to workers which allow for cleanup of async tasks which can be run as a cleanup operation to allow workers to be reused if a task timeout or cancellation occurs.

connects PR #441

joshLong145 commented 2 months ago

@josdejong After some testing of offloaded functions I think there are issues with referencing variables which are defined outside of the offloaded function scope

var workerpool = require('../');
var pool = workerpool.pool();

function asycTimeout() {
  return new Promise(function (resolve) {
    let timeout = setTimeout(function () {
        resolve();
    }, 5000);

    workerpool.addAbortListener(async function () {
        await new Promise((res, rej) => {
          setTimeout(res, 1000);
        });
        clearTimeout(timeout);
        resolve();
    });
  });
};

pool.exec(asycTimeout, [])

The above will error with workerpool is not defined. It does not seem like we can define an instance of workerpool within the global if using offloaded functions. I think the only way to support it would be to pass the registration handler to the function.

UPDATE:

After playing around with scopes on the function wrapper from in worker.methods.run I was able to bind addAbortListener to the function itself so it may be accessed with this.addAbortListener

worker.methods.run = function run(fn, args) {
  var f = new Function('return (' + fn + ').apply(this, arguments);');
  f.addAbortListener = function(listener) {
    worker.abortListeners.push(listener); 
  }

  return f.apply(f, args);
};

If we modify the global value to this instead of null we can then modify the f object to provide the addEventListener context. this is hacky but does now allow for a unique this context from within an offloaded function we still have access to the global through globalThis below is an example of an offloaded function which uses the above modified run implementation

var workerpool = require('../');
var pool = workerpool.pool();

function asycTimeout() {
  var me = this;
  return new Promise(function (resolve) {
    let timeout = setTimeout(function () {
        resolve();
    }, 5000);
    console.log(me.addAbortListener, globalThis);
    me.addAbortListener(async function () {
        console.log("adasd", clearTimeout);
        clearTimeout(timeout);
        resolve();
    });
  });
};

pool.exec(asycTimeout, [])
josdejong commented 2 months ago

It sounds like a good idea to attach this.addAbortListener to the function itself rather than a "magic" global variable workerpool.addAbortListener 👍 . Can you make sure that it is possible to use it as follows?

const addAbortListener = this.addAbortListener
// ...
addAbortListener(...)

EDIT: and then it makes sense to me to offer this as the only way to add an abort listener, for both offloaded and dedicated workers, right?

joshLong145 commented 2 months ago

EDIT: and then it makes sense to me to offer this as the only way to add an abort listener, for both offloaded and dedicated workers, right?

Yes this makes sense to me. can make the updates and implement the tests/examples now that we have this worked out.

Since we now can extend the function created for the worker task we can create a worker api with

So for example

this.worker.addEventListener
joshLong145 commented 2 months ago

@josdejong

Question on how errors are processed. Since cancel and timeout produce unique error types. and upon an error received over the worker rpc bridge will cause terminate to be invoked on the WorkerHandler instance. see exec on WorkerHandler for where terminateAndNotify is called. I do not think the current implementation is sufficient for properly keeping WorkerHandler instances preserved when attempting to re use workers. since the handler will ve deleted from the pool and a new worker will be created regardless on if the worker is actually cleaned up. I have added some unit tests which show that the onCreateWorker handler gets invoked multiple times when it should only have created a single worker for the lifetime of the pool.

I think for this new feature to work as intended the message protocol does need extending to account for cleanup such that handlers can be notified that the worker can survive the exception and not need cleanup.

joshLong145 commented 1 month ago

Thanks for the updates! I added a few inline comments.

About the WorkerHandler terminating the worker when receiving a cancel or timeout error: that is a very good point. Do you have ideas on how to solve this? Indeed both the worker side and the WorkerHandler side both need to know whether there is an abort listener in place or not, and adjust their behavior accordingly.

@josdejong Sorry for the delayed response.

After giving it some thought I see a possible flow for communication which will allow for proper tracking of handlers based on if a worker can be reused after the OnAbort handlers are triggered

sequenceDiagram
    participant Pool
    participant WorkerHandler
    participant Worker
    Pool->>WorkerHandler: TimeoutError/CamcelationError occures, move task with rosolver to `tracking` queue. Send a message to the worker to run cleanup with the task id
    WorkerHandler ->> Worker: Worker recieves message, execute abort handlers.
    Worker ->> WorkerHandler: Send the result of abort handler execution to the worker handler with the task id sent
    WorkerHandler ->> Pool: Check the task id for a tracking and if present either resolve or reject the resolver promise based on the data sent in the message from the worker. Cleanup the task context

With the above model, the resolver Promise created when exec is first called on the WorkerHandler will either resolve or reject based on the result of the message sent back from the onAbort listener execution. Which will be looked up from a tracking queue. The pool can now have a concept of tasks which need to be tracked for potential future cleanup. Since Cleanup operations are a parallel operation which requires resource tracking a second queue seems like the most obvious way of managing the resource.

The other idea, although much more involved is to rewrite how items are processed on the producer. Instead of items only being processed in a single promise chain with a recursive call. We could use something like p-queue to handle assigning tasks to workers and managing WorkerHandlers if tasks cause a worker to be terminated.

josdejong commented 1 month ago

Ow nice I didn't know that you can draw a sequenceDiagram straight in GitHub issues 😎.

I'm not sure whether a queue is really needed since a WorkerHandler and a Worker only process a single task at a time, but at least both need to get the information on whether to abort or not.

I think we could do something like this (it's close to your diagram I think):

  1. In case of a timeout error or termination request, the WorkerHandler sends a message to the Worker to abort. Then we have three possible situations:
    1. The Worker responds with "abort succesful". After that the WorkerHandler can terminate the Worker
    2. The Worker responds with "no abort callback". After that the WorkerHandler can terminate the Worker
    3. The Worker does not respond (is blocked by heavy CPU for example). After a limited timeout, the WorkerHandler will terminate the Worker anyway.
  2. In case of a cancel error, the WorkerHandler sends a message to the Worker to abort. Then we have three possible situations:
    1. The Worker responds with "abort succesful". Then we're done.
    2. The Worker responds with "no abort callback". After that the WorkerHandler can terminate the Worker
    3. The Worker does not respond (is blocked by heavy CPU for example). After a limited timeout, the WorkerHandler will terminate the Worker anyway.

What do you think?

joshLong145 commented 1 month ago

Ow nice I didn't know that you can draw a sequenceDiagram straight in GitHub issues 😎.

I'm not sure whether a queue is really needed since a WorkerHandler and a Worker only process a single task at a time, but at least both need to get the information on whether to abort or not.

I think we could do something like this (it's close to your diagram I think):

  1. In case of a timeout error or termination request, the WorkerHandler sends a message to the Worker to abort. Then we have three possible situations:

    1. The Worker responds with "abort succesful". After that the WorkerHandler can terminate the Worker
    2. The Worker responds with "no abort callback". After that the WorkerHandler can terminate the Worker
    3. The Worker does not respond (is blocked by heavy CPU for example). After a limited timeout, the WorkerHandler will terminate the Worker anyway.
  2. In case of a cancel error, the WorkerHandler sends a message to the Worker to abort. Then we have three possible situations:

    1. The Worker responds with "abort succesful". Then we're done.
    2. The Worker responds with "no abort callback". After that the WorkerHandler can terminate the Worker
    3. The Worker does not respond (is blocked by heavy CPU for example). After a limited timeout, the WorkerHandler will terminate the Worker anyway.

What do you think?

Your outline makes sense and aligns with what the diagram outlines but with better definitions of possible execution results. I think you have mapped out most of the remaining details. However, I think we might want an explicit case for when there are abort callbacks but they have thrown an error. While this will result in the same outcome as if the worker was stuck in some CPU bound operation we should handle it explicitly.

josdejong commented 1 month ago

Thanks. That makes sense indeed, the abort handler can throw an error too.

joshLong145 commented 3 weeks ago

Hey @josdejong

I think I have made good progress and was able to implement the feature set we have discussed above where

1) We are able to prevent the termination of a worker in the pool if the abortListeners resolve and no errors occur within scope of a listener 2) We can timeout the execution of abortListeners such that if they are too long running / never leave scope we can short circuit and terminate the worker. 3) If there is an error within the listener we can handle the rejection and terminate the worker.

I have added tests for both timeout and cancel behaviors to confirm both have the desired behaviors.

I still have to update the docs / examples but I think it is ready for review.

One thing I am noticing is that when I run tests the after all hook in Pool.test.js is failing to fulfill in some case and preventing the test run to exit. When running the tests individually I am not able to get the hook to hang. Was wondering if you had any idea what could be the culprit? Below is the final test output

  1) "after all" hook in "{root}"

  130 passing (17s)
  2 pending
  1 failing

  1) "after all" hook in "{root}":
     Error: Timeout of 10000ms exceeded. For async tests and hooks, ensure "done()" is called; if returning a Promise, ensure it resolves.
      at listOnTimeout (node:internal/timers:569:17)
      at process.processTimers (node:internal/timers:512:7)
josdejong commented 3 weeks ago

Thanks for the updates, that sounds good. I'll look into it after the holidays (sorry for the delay)