josdejong / workerpool

Offload tasks to a pool of workers on node.js and in the browser
Apache License 2.0
2.04k stars 148 forks source link

Feat: Add abort signal support for worker execution #441

Closed joshLong145 closed 2 months ago

joshLong145 commented 4 months ago

Adds support for Abort Signals through instances of AbortController for each task execution context. Functionality is enabled through the useAbortSignal flag on the execOptions object provided to the Worker exec method.

Example:

// file: myWorker.js
var workerpool = require('../../');

function abort(signal) {
    return new Promise(function (_resolve, reject) {
      signal.addEventListener("abort", () => {
          workerpool.workerEmit({
            state: signal.aborted ? 'term': 'active',
            message: signal.reason
          });
      });
      reject("rejecting");
    });
}

workerpool.worker({
    abort: abort
});
var pool = createPool(__dirname + '/workers/abort.js');

pool.exec('abort', [],{
  on: (payload) => {
    console.log("recieved event: ", payload).
  },

  useAbortSignal: true
})
.catch(function (err) {
  console.log("recieved error: ", err);
});

Next Steps

To further the value of abort signaling there should be functionality added to Promise.timeout wrapper created during time of task execution to allow for further bridging to the given worker instance which is executing the task to explicitly call abort on the signal created when the task began it's execution to allow for cleanup on a given task timing out from the main thread context.

If the functionality within this PR is merged the above outline can be turned into an issue and mentioned here for tracking.

josdejong commented 4 months ago

Thanks for your PR Josh, it looks well taken care of 👌 .

Can you explain what the use cases are for this new abort signal API? I suppose it is to for example cancel running fetch actions, or close a DB connection? I try to understand what it's capabilities and limitations are Like: I suppose it can be used in both browser and node.js? And I suppose it will not work when the task is running a synchronous operation, only async?

Before this feature is ready to use it indeed has to work with both the .cancel() and .timeout() methods of the Promise implementation of workerpool.

A real world example would help me understand how this is to be used, can you give an example?

On a side note: I expect to reply only after the weekend, I'm away for a couple of days.

joshLong145 commented 4 months ago

A real world example would help me understand how this is to be used, can you give an example?

Sure! The main use cases assuming the current implementation is to allow for cleanup in the case of an async operations which have failure cases and need some clean which might also need top level async/await to be implemented in a concise way.

const workerpool = require('workerpool');
function queryRunner(signal, queryString) {
    const db = new DbController({}); // some database client;
    await db.connect();
    const query = db.query(queryString);
    signal.addEventListener('abort', async () => {
      query.cancel(); // cancel the query operatio
    });

    // Some call to start the query and block until resolution
    // If this promise rejects the above abort listener will habdle the clean up
    return query.execute();
}

workerpool.addWorker({
  queryRunner: queryRunner
});

There is also some benefit with the current implementation if considering setTimeout operations which are still pending when a promise rejects.

const workerpool = require('workerpool');
async function setTimeout(signal, ms) {
  return await new Promise((resolve) => {
    const timer = setTimeout(() => {
      resolve();
    }, ms);
    signal.addEventListener('abort', () => {
      clearTimeout(timer);
    });

    // some implementation which might cause a promise reject.
  });
}

workerpool.addWorker({
  setTimeout: setTimeout
});

Before this feature is ready to use it indeed has to work with both the .cancel() and .timeout() methods of the Promise implementation of workerpool.

The majority of the value comes from integration with .cancel() and .timeout() methods. However, there is some value with the current implementation in the fact that cleanup can now occur through the eventTarget behind the Signal allowing for async cleanup within the worker on error cases without having to explicitly catch / handle all possible error cases.

josdejong commented 4 months ago

Thanks for your explanation, I'll have a look at it asap.

joshLong145 commented 4 months ago

This usecase definitely makes sense, thanks for the explanation.

I made a few inline comments, but most importantly I would like to think through the API: how to pass the signal to the function?

It feels a bit weird to prepend the arguments with an extra argument signal. How about attaching the signal to the method, so you can use it inside the function like this.signal.addEventListener(...) ? Or introduce a global worker.onAbort, or so?

I would like to see if we can get rid of the need for useAbortSignal. Suppose that we go for using this.signal or a global worker.onAbort, then there is no need for the useAbortSignal. The workerpool can automatically create an AbortController and attach it to every registered method. Only thing is that we need to make sure the added listeners are removed again at the end of the method call. If that's not possible we can probably create a new AbortController right before every call, and ideally create it lazily. We should look into any performance impact then though. We could also think through whether we want to use AbortController or instead create a custom callback like this.onAbort or a global worker.onAbort. In that case we can optimize it ourselves, like: create it only once, keep a list with callbacks in an array, and simply clear this list right before every method invocation. What do you think?

I think this make sense, I was originally going to implement this by injecting the signal on the function which is being invoked in the worker which would allow for usage like the below example

const workerpool = require('workerpool');
async function setTimeout( ms) {
  return await new Promise((resolve) => {
    const timer = setTimeout(() => {
      resolve();
    }, ms);
    this.signal.addEventListener('abort', () => {
      clearTimeout(timer);
    });

    // some implementation which might cause a promise reject.
  });
}

workerpool.addWorker({
  setTimeout: setTimeout
});

The reason I ended up adding it as an option was to make it more obvious that the signals are now in scope. However, I now agree that it's a better pattern as modifying the function signature from what the end user has defined is not ideal. And it would be better to simply inject the signal into the object which is the function. This would then allow the flag to be removed as this feature will no longer break backwards compatibility. I am choosing injecting the signal over a global signal as I feel that unique signals per worker function is better as it allows for more granular error contexts than a single global which could end up getting rather bloated. It will also be harder to keep track of since the current system keeps a global map of function name -> function but does not seem to know what functions are from what file context. Thus is currently ambiguous.
I will refactor to this pattern over passing as a function parameter.

joshLong145 commented 4 months ago

@josdejong

I've updated the pr to no longer pass the signal as a parameter to the worker function and instead assign it to the worker function which allows for access within the worker execution scope. I've also added a new event onAbort which will be invoked if a task has the flag aborted as true.

I realized I was not using the on event properly as the task context will be cleaned up once the task queue has seen that the task has resulted in an error. The PR description and title have been updated to reflect the change in direction. Let me know what you think, with the changes I've made I am rather close to having cancel and setTimeout triggering the task abort.

joshLong145 commented 4 months ago

The downside of the current implementation is that if functions are not defined in a script context which is loaded into the worker on time of creation, we are not currently able to inject a signal into the worker scope since they are executing with global as the this context. It currently seems rather awkward to force the signal into the global of the worker as cleanup would have to occur since signals are meant to only be used for a single abort trigger. see commit: 14656f3ad734aba893b0d352972a3438b478afaf

I think we can weigh if having signaling support when workers are executed through the run context is worth possibly going back to the original implementation which passed as a parameter since the parameter would work with the global execution context.

joshLong145 commented 4 months ago

Thanks a lot, I like where this is heading!

The downside of the current implementation is that if functions are not defined in a script context which is loaded into the worker on time of creation

I'm not entirely sure if I understand what you mean. Do you mean that the signals do not work for "offloaded functions"?

I think so yes, below is the implementation of the worker.methods.run

/**
 * Execute a function with provided arguments
 * @param {String} fn     Stringified function
 * @param {Array} [args]  Function arguments
 * @returns {*}
 */
worker.methods.run = function run(fn, args) {
  var f = new Function('return (' + fn + ').apply(null, arguments);');
  return f.apply(f, args);
};

The above implementation encapsulates the execution of an offloaded function. Since the first apply runs in scope of the Function constructor is is rather difficult to change the this context to anything but globalThis the original implementation using parameters works with the current implementation however.

josdejong commented 3 months ago

Thanks for the updates!

Some thoughts: the old behavior was that when a running task is cancelled or times out, the worker running that task is terminated. The new behavior is that before termination, an abort event is delivered to the worker so it can neatly close database connections or open requests for example.

With this in mind:

  1. I would expect this new event listener for an abort signal to be something that is purely on the worker side and not on the pool side (the main thread). So I would not expect a new option onAbort at the Pool side? I think the Pool side should not know at all whether the worker task has abort logic or not?
  2. I would not expect this new behavior to have any impact on Promise. I would like to keep Promise.js unchanged, it feels like we're attaching logic that should not be there.

Some thoughts about the implementation:

  1. in the WorkerHandler I would expect extra logic before the return me.terminateAndNotify(true) (or as part of it), that sends a new type of message "ABORT" to the worker (you called it "TIMEOUT" right now). And then, in worker.js, I would expect some code to listen for this "ABORT" and trigger the abort callback (if any).

  2. To prevent having to do bookkeeping with a worker.controllers[request.id], it may be an idea to utilize the fact that we only execute a single task at a time in a worker, so we do not need to keep track of multiple abort controllers.

  3. With (2) in mind, attaching the signal to the functions can give issues with not using the right context, breaking usage of this.signal. It may work out very well to create a global callback listener on worker instead? It could even work as primitive as the following:

    function myWorkerFunction() {
        return new Promise(function (_resolve, reject) {
          worker.onAbort = () => {
            console.log("abort event triggered");
          };
          reject("rejecting");
        });
    } 
    
    workerpool.worker({
        myWorkerFunction
    });

    then, in the worker.js code, all we have to do is (a) when a task is cancelled or timedout, check if worker.onAbort is defined and if so, invoke it. And after the task is finished, clear worker.onAbort (if defined).

  4. To neatly wait until the clients abort code (like shutting down a db connection) is finished, I think we should let it return a promise and await until it resolves. Only after that, we should terminate the worker. Even better: if we know the task has neatly aborted, there is no need to terminate the worker? I guess we should do that only when the abort promise returns some specific flag denoting that there is no need for termination of the worker.

josdejong commented 3 months ago

@joshLong145 I see you commented yesterday but removed your comment again. Any thoughts on this?

joshLong145 commented 3 months ago

@joshLong145 I see you commented yesterday but removed your comment again. Any thoughts on this?

I took your advice and removed all my logic for managing controllers and went with a single controller being invoked from a message sent from the handlers processing of the timeout or cancel exception. The problem I am seeing is being able to modify this 'worker' object you use in your example of invoking our own callback instead of using a controller instance managed by the worker. In my testing I have been unable to modify the global state of any variable and have it reflected on the worker in scope of the message handler once we receive the message from the timeout in the worker context. It seems that the only global variables from within the worker are what is standard on globalThis for a worker instance. So I do not fully understand how a global function defined from within a worker's scope would work. From my testing the only pattern that works for all library features is something that is passed as a parameter which is passed to the worker explicitly. I think we can still only keep one instance of the AbortController instead of managing one for each request. But I don't know how to get around the parameter passing at this time due to the above mentioned findings.

josdejong commented 3 months ago

I was looking into this in more depth, and noticed that the onTerminate option is really close to what we're looking for here for this onAbort. So much in fact I think there may be no need for additional features.

Here a working example demonstrating how to close a db connection on termination (after a cancel or timeout):

// main.js
const workerpool = require("workerpool");

const pool = workerpool.pool(__dirname + "/workers/cancelWorker.js", {
  workerType: "thread"
})

const main = async () => {
  await pool
    .exec("runDbQuery", [2, 3])
    // .timeout(2000) // the timeout will trigger a termination, which neatly closes the connection beforehand
    .then((result) => console.log('runDbQuery result', result))
    .catch((err)  => console.error(err))

  await pool.terminate()
};

main();

With the following worker:

// cancelWorker.js
const workerpool = require("workerpool");

// note: we could also keep an array with multiple connections,
// but we know that a worker only executes one function at a time,
// so there is no need for that.
let dbConnection = null

async function openConnection() {
  // we mimic an open db connection by a simple number
  const newConnection = Math.round(Math.random() * 1000)

  console.log(`Opening db connection ${newConnection}`)

  return newConnection
}

async function closeConnection(dbConnection) {
  console.log(`Closing db connection ${dbConnection}...`)
  await sleep(500)

  console.log(`db connection ${dbConnection} closed`)
  clearInterval(dbConnection)
  dbConnection = null
}

async function runDbQuery(a, b) {
  // register the connection in a global variable
  dbConnection = await openConnection()

  // pretend to run a query
  console.log(`Executing query via db connection ${dbConnection}...`)
  await sleep(5000)
  const result = a  + b
  console.log(`Query results are in: ${result}`)

  await closeConnection(dbConnection)

  return result
}

function sleep(delay) {
  return new Promise(resolve => setTimeout(resolve, delay))
}

// create a worker and register public functions
workerpool.worker(
  {
    runDbQuery
  },
  {
    onTerminate: function (code) {
      if (dbConnection) {
        return closeConnection(dbConnection)
      }
    },
  }
)

Now, we could think through adding some "eye candy" on top of onTerminate that makes this scenario easier and to enable using it in an offloaded worker. I think this API could look like:

async function runDbQuery(a, b) {
  // register the connection in a global variable
  dbConnection = await openConnection()

  // register the closeConnection function via worker.onAbort, 
  // so it will be invoked when the worker terminates after a cancel or timeout
  worker.onAbort = () => closeConnection(dbConnection)

  // pretend to run a query
  console.log(`Executing query via db connection ${dbConnection}...`)
  await sleep(5000)
  const result = a + b
  console.log(`Query results are in: ${result}`)

  await closeConnection(dbConnection)

  return result
}

To implement this, I think all we have to do is add a section to worker.cleanupAndExit which executes an async worker.onAbort if defined, and after that clears the worker.onAbort again.

joshLong145 commented 3 months ago

I was looking into this in more depth, and noticed that the onTerminate option is really close to what we're looking for here for this onAbort. So much in fact I think there may be no need for additional features.

Here a working example demonstrating how to close a db connection on termination (after a cancel or timeout):

// main.js
const workerpool = require("workerpool");

const pool = workerpool.pool(__dirname + "/workers/cancelWorker.js", {
  workerType: "thread"
})

const main = async () => {
  await pool
    .exec("runDbQuery", [2, 3])
    // .timeout(2000) // the timeout will trigger a termination, which neatly closes the connection beforehand
    .then((result) => console.log('runDbQuery result', result))
    .catch((err)  => console.error(err))

  await pool.terminate()
};

main();

With the following worker:

// cancelWorker.js
const workerpool = require("workerpool");

// note: we could also keep an array with multiple connections,
// but we know that a worker only executes one function at a time,
// so there is no need for that.
let dbConnection = null

async function openConnection() {
  // we mimic an open db connection by a simple number
  const newConnection = Math.round(Math.random() * 1000)

  console.log(`Opening db connection ${newConnection}`)

  return newConnection
}

async function closeConnection(dbConnection) {
  console.log(`Closing db connection ${dbConnection}...`)
  await sleep(500)

  console.log(`db connection ${dbConnection} closed`)
  clearInterval(dbConnection)
  dbConnection = null
}

async function runDbQuery(a, b) {
  // register the connection in a global variable
  dbConnection = await openConnection()

  // pretend to run a query
  console.log(`Executing query via db connection ${dbConnection}...`)
  await sleep(5000)
  const result = a  + b
  console.log(`Query results are in: ${result}`)

  await closeConnection(dbConnection)

  return result
}

function sleep(delay) {
  return new Promise(resolve => setTimeout(resolve, delay))
}

// create a worker and register public functions
workerpool.worker(
  {
    runDbQuery
  },
  {
    onTerminate: function (code) {
      if (dbConnection) {
        return closeConnection(dbConnection)
      }
    },
  }
)

Now, we could think through adding some "eye candy" on top of onTerminate that makes this scenario easier and to enable using it in an offloaded worker. I think this API could look like:

async function runDbQuery(a, b) {
  // register the connection in a global variable
  dbConnection = await openConnection()

  // register the closeConnection function via worker.onAbort, 
  // so it will be invoked when the worker terminates after a cancel or timeout
  worker.onAbort = () => closeConnection(dbConnection)

  // pretend to run a query
  console.log(`Executing query via db connection ${dbConnection}...`)
  await sleep(5000)
  const result = a + b
  console.log(`Query results are in: ${result}`)

  await closeConnection(dbConnection)

  return result
}

To implement this, I think all we have to do is add a section to worker.cleanupAndExit which executes an async worker.onAbort if defined, and after that clears the worker.onAbort again.

Thanks for the reply. I see your perspective with onTerminate largely implementing what is being attempted with this PR with the use of Abort Signals. However, with your example you forgo implementations which are async that support programmatic abort through Abort Signals for example fetch in this example in the mdn docs a signal is provided which force the promise to reject if abort is called on the controller. which allows the promise to reject If other failures occur. Another example of an api with signal support is Azure Blob Storage SDK. So thought you are correct with there being an easier implementation with only managing a single AbortController im unsure if I agree that onTerminate() solves the problem as it cannot cancel inner promises which may need to be canceled as well to fully stop a worker method. So with that said I think we still need an AbortController instance for the executing worker method.

Also, with the example you add for how onTerminate may be defined in scope of a worker through onaAbort I'm unsure how this implementation would work.

async function runDbQuery(a, b) {
  // register the connection in a global variable
  dbConnection = await openConnection()

  // register the closeConnection function via worker.onAbort, 
  // so it will be invoked when the worker terminates after a cancel or timeout
  worker.onAbort = () => closeConnection(dbConnection)

  // pretend to run a query
  console.log(`Executing query via db connection ${dbConnection}...`)
  await sleep(5000)
  const result = a + b
  console.log(`Query results are in: ${result}`)

  await closeConnection(dbConnection)

  return result
}

In the above onAbort is being defined on worker in scope of runDbQuery however. Im unsure how the worker has a global object worker since when I inspect global in the worker context I only observe the following

<ref *1> Object [global] {
  global: [Circular *1],
  queueMicrotask: [Function: queueMicrotask],
  clearImmediate: [Function: clearImmediate],
  setImmediate: [Function: setImmediate] {
    [Symbol(nodejs.util.promisify.custom)]: [Getter]
  },
  structuredClone: [Getter/Setter],
  clearInterval: [Function: clearInterval],
  clearTimeout: [Function: clearTimeout],
  setInterval: [Function: setInterval],
  setTimeout: [Function: setTimeout] {
    [Symbol(nodejs.util.promisify.custom)]: [Getter]
  },
  atob: [Getter/Setter],
  btoa: [Getter/Setter],
  performance: [Getter/Setter],
  fetch: [AsyncFunction: fetch]
}

So I'm unsure how a worker object in the global may be manipulated as it is not defined at least in ways that I have tried up to this point.

josdejong commented 3 months ago

Sorry about the confusion about worker. What I meant was workerpool, the imported library. So it would be an overwritable function of the library, workerpool.onAbort = ..., similar to how we can use the global window.onclick = .... Now, it's ugly to override a global method, so maybe we should introduce a method for that, like workerpool.addAbortListener(...), similar to window.addEventListener(...).

I think an AbortController can be used on top of onTerminate too, something like:

// cancelWorker.js
const workerpool = require("workerpool");

// note: we could also keep an array with multiple controllers,
// but we know that a worker only executes one function at a time,
// so there is no need for that.
let myAbortController = null

async function runFetch(url) {
  myAbortController = new AbortController()

  // You can use myAbortController.signal.addEventListener if there is 
  // more stuff to be taken care of when `fetch` triggers the abort action

  return fetch(url, { signal: myAbortController.signal })
}

// create a worker and register public functions
workerpool.worker(
  {
    runFetch
  },
  {
    onTerminate: function (code) {
      if (myAbortController) {
        myAbortController.abort()
      }
    }
  }
)

I may be overlooking something, if that is the case, can you share an example of it to explain?

joshLong145 commented 2 months ago

Sorry about the confusion about worker. What I meant was workerpool, the imported library. So it would be an overwritable function of the library, workerpool.onAbort = ..., similar to how we can use the global window.onclick = .... Now, it's ugly to override a global method, so maybe we should introduce a method for that, like workerpool.addAbortListener(...), similar to window.addEventListener(...).

I think an AbortController can be used on top of onTerminate too, something like:

// cancelWorker.js
const workerpool = require("workerpool");

// note: we could also keep an array with multiple controllers,
// but we know that a worker only executes one function at a time,
// so there is no need for that.
let myAbortController = null

async function runFetch(url) {
  myAbortController = new AbortController()

  // You can use myAbortController.signal.addEventListener if there is 
  // more stuff to be taken care of when `fetch` triggers the abort action

  return fetch(url, { signal: myAbortController.signal })
}

// create a worker and register public functions
workerpool.worker(
  {
    runFetch
  },
  {
    onTerminate: function (code) {
      if (myAbortController) {
        myAbortController.abort()
      }
    }
  }
)

I may be overlooking something, if that is the case, can you share an example of it to explain?

Ah thank you for the clarification on worker was going a bit crazy 😆. This now is making much more sense to me. My thought process around providing an Abort Controller Where mainly to prevent the instance from leaving scope before the worker runtime could run clean up in terminateAndNotify but since i now understand the execution model better. Meaning that there is only one executing method at a time per worker I no longer think this is an issue. As long as users call abort within the new wrapper onAbort there is no loss of functionality with them managing it vs the worker managing it.

I Think we have a good path forward with

josdejong commented 2 months ago

Sounds good!

I think the onAbort wrapper will be a useful addition.

Do you have thoughts on how to best expose the onAbort function? Like workerpool.onAbort = ... or workerpool.addAbortListener(...)? I have the feeling that the latter is most neat and most future proof.

joshLong145 commented 2 months ago

Do you have thoughts on how to best expose the onAbort function? Like workerpool.onAbort = ... or workerpool.addAbortListener(...)? I have the feeling that the latter is most neat and most future proof.

workerpool.addAbortListener(...) makes the most sense to me. Having a function which takes the listener as an argument is the most flexible.

joshLong145 commented 2 months ago

Do you have thoughts on how to best expose the onAbort function? Like workerpool.onAbort = ... or workerpool.addAbortListener(...)? I have the feeling that the latter is most neat and most future proof.

workerpool.addAbortListener(...) makes the most sense to me. Having a function which takes the listener as an argument is the most flexible.

@josdejong

Has been implemented in PR #448