thefrontside / effection

Structured concurrency and effects for JavaScript
https://frontside.com/effection
MIT License
592 stars 25 forks source link

How to delay abort/process shutdown? #888

Closed joakimbeng closed 10 months ago

joakimbeng commented 10 months ago

Hi,

Effection looks great and I'm thinking of using Effection in a project but there's one scenario that I don't know how to solve with Effection that you probably can help me with.

Let's say I have a couple of asynchronous tasks to execute and I want to execute them one at a time (but of course the tasks themselves can trigger other asynchronous tasks). When the process is interrupted (for instance with ctrl+c) I want to cancel the currently running task and skip the rest, which seems very easy with Effection, but what if I want to allow some extra time period for the current task to finish before the process is shutdown? I.e. let's say that from the time when the process is interrupted the current task should have a maximum of 10 additional seconds to complete, and if it completes within that time it should not throw and the process should cleanup all other resources and exit. But if it doesn't complete within this timeframe from the moment of the interruption it should be "force cancelled" and throw an error and the rest of the shutdown process/cleanup should continue.

And if the process is not interrupted it should just shutdown as soon as everything has been executed successfully, without having to wait the extra 10 seconds.

Do you follow what I'm after and how do I accomplish that with Effection?

cowboyd commented 10 months ago

@joakimbeng So glad that you're interested!

I think I see what you're saying. The first thing that comes to mind would be to race the task against a timeout operation. That way, if the task is already completed, it will win immediately, otherwise, it will have the entirety of the timeout period in order to win.

import { main, sleep, race, spawn } from "effection";

await main(function*() {
  let task = yield* spawn(function*() { /* do stuff */ });

  try {
    yield* task;
  } finally {
    yield* race([task, call(function*() {
      yield* sleep(10_000);
      throw new Error(`task did not finish after receiving an additional 10 seconds`);
    })]);
  }
});

Does that make sense?

Also, if you want to manage more complex workflows around SIGNT then you can always do so by using run() directly instead of main(). For example, this would give an extra ten seconds after pressing CTRL-C once, but then halt everything immediately after the second time.


import { run, ensure, race, sleep, spawn, createSignal } from "effection";

await run(function*() {
  // subscribe to all SIGINTs
  let signal = createSignal<void>();
  process.on("SIGINT", signal.send);
  yield* ensure(() => process.off("SIGINT", signal.send));
  let sigints = yield* signal;

  let task = yield* spawn(function*() { /* do stuff */ });

  yield* race([task, call(function*() {
    yield* sigints.next(); // first ctrl-c
    console.log('shutdown requested. giving 10 extra seconds to finish')

    let timeout = call(function*() {
      yield* sleep(10_000);
      throw new Error(`timed out`);
    });

    let hardstop = call(function*() {
      yield* sigints.next(); // second sigint
      console.log('ok, received hard stop, exiting immediately');
    });

    yield* race([hardstop, timeout]);
  });
});
cowboyd commented 10 months ago

@joakimbeng I'm going to go ahead and close this, but if you have any remaining questions or observations about it, we would love to hear them!

joakimbeng commented 10 months ago

@cowboyd thanks for the response!

Just so that I understand your example correctly...

Let's say that task is a something asynchronous (for instance a database query) that will tak 60 seconds to run, and a SIGINT is received after 20 seconds.

At that point I guess the flow will jump to the finally clause? And at this point the asynchronous task is still running you mean, as you re-use the task variable in the race call, even if it uses async/await and not generator functions?

cowboyd commented 10 months ago

And at this point the asynchronous task is still running you mean, as you re-use the task variable in the race call, even if it uses async/await and not generator functions?

Yes, that is correct. First, the operation runs fully, and then all its children that are running, such as the async task, are then shut down. But, if you consume that task in as part of the operation exiting, then it will keep it open longer.

As for the things running as async functions, you will need to plugin them into your task with call(). So if your task could look like this:

let task = yield* spawn(function*() {
  let connection = yield* call(async function() {
     return await getDatabaseConnection();
  });

  try {
    yield* call(() => connection.runQuery());
  } finally {
    yield* call(async () => { await connection.destroy() });
  }
});

If it loses the race, then it will also be shut down, and the connection will be destroyed.