taskiq-python / taskiq

Distributed task queue with full async support
MIT License
689 stars 44 forks source link

Graceful shutdown and reload #325

Closed dima-kov closed 3 weeks ago

dima-kov commented 1 month ago

Hi! Thank you for to simple and easy to use tasks manager lib!

Please help me to understand, how to achieve graceful reload? Can't find any docs about this.

How it works now: when I cancel the process KeybordInterrupt is raised and tasks are cancelled (unfinished). What I/m looking for: possibility to send HUP event, like I use for uvicorn servers manged by supervisor. How it works:

  1. stops accepting new requests;
  2. finishes all open requests;
  3. restarts the process;
  4. accepts new requests;

What is an expected way for taskiq to handle code updates (releases)?

s3rius commented 1 month ago

I guess graceful updates are not supported right now, the main process won't handle a HUP signal, but it can be added.

I think it's a good thing to have.

dima-kov commented 1 month ago

Wow, that's a really big problem when required to use in production.

What can I do to make it happen? How can I contribute?

s3rius commented 1 month ago

Should be easy to add. Basically, we have everything for this feature to be implemented.

In the ProcessManager you can define a function that reloads all subprocesses.

def reload_all(self):
    self.action_queue.put(ReloadAllAction())

And somewhere here, before the process manager has started, you can define signal handler that will call a reload_all action on a process_manager in case of sigHUP.

itssimon commented 3 weeks ago

Graceful shutdowns without cancelled tasks would be a big improvement for my use case too.

s3rius commented 3 weeks ago

Okay. Since this issue is a bit stale, I will try implementing it.

s3rius commented 3 weeks ago

@itssimon, @dima-kov, please verify that the feature was implemented correctly. It was released in 0.11.4.

dima-kov commented 1 day ago

Took a look on how it works. Test plan is: send two long async tasks, and during execution send HUP signal. Expected: existing tasks should be finished succesfull. Actual: worker reloaded, just same like before, stopping the task execution.

LOGS:

[2024-07-04 10:37:15,258][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
[2024-07-04 10:37:23,279][taskiq.receiver.receiver][INFO   ][worker-0] Executing task apps.landing.tasks.ai:long_task with ID: 96501d3d11f04f6fbb4454b84934dc1d
[2024-07-04 10:37:23,280][apps.landing.tasks.ai][INFO   ][worker-0] start long task 10
[2024-07-04 10:37:24,282][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 0/10
[2024-07-04 10:37:25,284][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 1/10
[2024-07-04 10:37:26,286][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 2/10
[2024-07-04 10:37:27,287][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 3/10
[2024-07-04 10:37:28,289][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 4/10
[2024-07-04 10:37:28,703][taskiq.receiver.receiver][INFO   ][worker-0] Executing task apps.landing.tasks.ai:long_task with ID: b2c680dacb5845f49eceabf154649905
[2024-07-04 10:37:28,703][apps.landing.tasks.ai][INFO   ][worker-0] start long task 16
[2024-07-04 10:37:29,295][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 5/10
[2024-07-04 10:37:29,704][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 0/16
[2024-07-04 10:37:30,296][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 6/10
[2024-07-04 10:37:30,705][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 1/16
[2024-07-04 10:37:31,298][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 7/10
>>>>>>>>>>>>>> SENT HUP SIGNAL IN THIS MOMENT <<<<<<<<<<<<<
[2024-07-04 10:37:31,440][taskiq.process-manager][WARNING][MainProcess] Workers are scheduled for shutdown.
[2024-07-04 10:37:31,706][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 2/16
[2024-07-04 10:37:32,299][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 8/10
[2024-07-04 10:37:32,706][apps.landing.tasks.ai][INFO   ][worker-0] 1s long task 3/16
[2024-07-04 10:37:33,283][taskiq.worker][WARNING][worker-0] Worker process interrupted.
[2024-07-04 10:37:33,283][taskiq.worker][WARNING][worker-0] Shutting down the broker.
[2024-07-04 10:37:33,307][taskiq.process-manager][INFO   ][MainProcess] Process worker-0 restarted with pid 3734964
[2024-07-04 10:37:36,403][telethon.crypto.aes][INFO   ][worker-0] cryptg detected, it will be used for encryption
[2024-07-04 10:37:37,373][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.

hup kill: kill -HUP 3734916