tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
532 stars 37 forks source link

Stopping Worker Cancels Running Tasks #75

Closed jamesoguyer closed 1 year ago

jamesoguyer commented 1 year ago

Can stopping the worker get an option wait for all tasks to complete rather than cancel them? Would be useful to have a way to block new jobs from being processed and wait for the existing jobs to finish.

# change - allow for option to skip the task cancel() and set flag to block processing new tasks
async def stop(self) -> None:
    """Stop the worker and cleanup."""
    self.event.set()
    all_tasks = list(self.tasks)
    self.tasks.clear()
    for task in all_tasks:
        task.cancel()
    await asyncio.gather(*all_tasks, return_exceptions=True)

# change - check for job processing flag from worker stop and don't schedule new tasks
def _process(self, previous_task: Task | None = None) -> None:
  if previous_task:
      self.tasks.discard(previous_task)

  if not self.event.is_set():
      new_task = asyncio.create_task(self.process())
      self.tasks.add(new_task)
      new_task.add_done_callback(self._process)
tobymao commented 1 year ago

I have no plans to implement this but happy to accept a well written + tested PR. Happy to help you through it, let me know if you're interested.

jamesoguyer commented 1 year ago

Would like to create a PR to help support this. Ran into an issue where updates to machines would take down the workers and cause tasks to be aborted while in progress. Would like to take fleet of machines processing jobs individually draining jobs from them first before taking them offline.

tobymao commented 1 year ago

Yea, it's designed to recover, so a shut down worker will cancel jobs immediately, and they'll picked up by other workers. This was the pattern we used.

hit me up on slack

The issue with this approach from what I remember is that when a machine is taken off line, it's immediate, there's no buffer time -- you don't really control that on something like kubernetes, so you want to cancel the jobs asap. If you have some kind of wait time, it's possible the machine just shuts down and you don't get to clean up tasks.

jamesoguyer commented 1 year ago

No change needed, will design all tasks that use the worker library to be atomic so they can re-run safely if they are aborted halfway through.