tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
577 stars 39 forks source link

SAQ clients hang if one of workers is dead #118

Closed lega911 closed 6 months ago

lega911 commented 6 months ago

Hi!

I found that communication based on SAQ hangs sometimes, and found 2 issues. Platform: AWS, EC2, ECS.

1) If there are a few saq-workers, and one goes down, then all* communication can hang. I've made an example to be able to reproduce it: https://github.com/lega911/saq-test There are 3 workers and 3 clients which call the same rpc function (the same queue). After 15 calls one worker are stopped (SIGSTOP), then clients can call more 1-20 calls, then they all hang. I have guessing why it happens, but what's solution for this? Tasks should go only to live workers.

2) About second issue: a client throws TimeoutError though a worker has finished and saved result in Redis (there is value "status": "complete" by key), so client hangs on 60 sec timeout, when worker finished a task succefully in 1 sec. Is there any solution for this? I will back when find more details.

Thanks.

tobymao commented 6 months ago

thanks for the example, what is your guess to what's happening?

i'm not sure what you mean by 2, are you saying there's a race condition where a worker finishes, but the timeout is reached?

tobymao commented 6 months ago

ok, i looked into this. when you sigstop or sigkill a worker, there's no way for python to cleanly shutdown and release a job, that means a job will become stuck. a job has a timeout, so if you anticipate jobs getting stuck like that, you should reduce the timeout.

another mechanism in place is the sweeper, by default every 60 seconds, saq will sweep for stuck jobs and free them up. if you add in your worker.py

settings = { "timers": {"sweep": 1} }

this will sweep much more frequently and you'll see saq recovering from the stuck jobs.

happy to discuss more, but it seems like we recover, it's just the sweeper default is slower than what you'd like

lega911 commented 6 months ago

i'm not sure what you mean by 2, are you saying there's a race condition where a worker finishes, but the timeout is reached?

it's not race condition: a task finished in 1 sec, but then after 60 sec a client gets TimeoutError. Looks like a notification from worker to client is missing somewhere, (I think to look it deeper).

tobymao commented 6 months ago

let me know if you find anything or have a fix

lega911 commented 6 months ago

there's no way for python to cleanly shutdown and release a job

Yes, the same happens when a server goes down, container is killed or some network failure. It's ok that running tasks hangs on timeout, the problem that new (future) tasks also rejected, which is not good, because they could be processed on live workers.

I updated example, now "sweep" is 1 sec, yes - now it rejects faster. But it rejects future tasks. Some logs below: a worker was killed, after that clients send new tasks, and some of them rejected:

Client #51 recv 11 of 50 ({'id': 10, 'result': 146, 'pid': 16})
Client #52 send 12 of 50 ({'id': 11, 'a': 90, 'b': 33}) fecd5cfa-03f1-11ef-b642-0242ac110002
  Worker #16: request 11 ({'id': 11, 'a': 90, 'b': 33})

!!! STOP worker pid 16 

Client #51 send 12 of 50 ({'id': 11, 'a': 44, 'b': 78}) ff79a898-03f1-11ef-b938-0242ac110002
...
Client #51 11 of 50 0.51s --- ERROR (JobError('Job saq:job:default:ff79a898-03f1-11ef-b938-0242ac110002 aborted\n\nThe above job failed w)
Client #51 12 of 50 0.50s --- ERROR (JobError('Job saq:job:default:00146b44-03f2-11ef-b938-0242ac110002 aborted\n\nThe above job failed w)
Client #51 13 of 50 0.50s --- ERROR (JobError('Job saq:job:default:00ad679a-03f2-11ef-b938-0242ac110002 aborted\n\nThe above job failed w)
Client #51 14 of 50 0.50s --- ERROR (JobError('Job saq:job:default:0146647c-03f2-11ef-b938-0242ac110002 aborted\n\nThe above job failed w)
Client #51 15 of 50 0.50s --- ERROR (JobError('Job saq:job:default:01df4f8e-03f2-11ef-b938-0242ac110002 aborted\n\nThe above job failed w)
Client #51 16 of 50 0.50s --- ERROR (JobError('Job saq:job:default:027855b2-03f2-11ef-b938-0242ac110002 aborted\n\nThe above job failed w)
Client #51 17 of 50 0.50s --- ERROR (JobError('Job saq:job:default:031175f8-03f2-11ef-b938-0242ac110002 aborted\n\nThe above job failed w)
Client #53 9 of 50 60.01s --- ERROR (TimeoutError())
Client #52 11 of 50 60.01s --- ERROR (TimeoutError())
Client #53 14 of 50 0.61s --- ERROR (JobError('Job saq:job:default:25a747d2-03f2-11ef-ac6b-0242ac110002 aborted\n\nThe above job failed w)
Client #53 15 of 50 0.50s --- ERROR (JobError('Job saq:job:default:26516276-03f2-11ef-ac6b-0242ac110002 aborted\n\nThe above job failed w)
Client #52 16 of 50 0.95s --- ERROR (JobError('Job saq:job:default:26a619d8-03f2-11ef-b642-0242ac110002 aborted\n\nThe above job failed w)
Client #53 16 of 50 0.50s --- ERROR (JobError('Job saq:job:default:26ea798e-03f2-11ef-ac6b-0242ac110002 aborted\n\nThe above job failed w)
Client #52 17 of 50 0.50s --- ERROR (JobError('Job saq:job:default:2783a316-03f2-11ef-b642-0242ac110002 aborted\n\nThe above job failed w)
Client #53 17 of 50 0.50s --- ERROR (JobError('Job saq:job:default:2783d2c8-03f2-11ef-ac6b-0242ac110002 aborted\n\nThe above job failed w)
Client #52 18 of 50 0.50s --- ERROR (JobError('Job saq:job:default:281d01f0-03f2-11ef-b642-0242ac110002 aborted\n\nThe above job failed w)
Client #53 18 of 50 0.50s --- ERROR (JobError('Job saq:job:default:281d2d60-03f2-11ef-ac6b-0242ac110002 aborted\n\nThe above job failed w)
Client #52 19 of 50 0.51s --- ERROR (JobError('Job saq:job:default:28b6611a-03f2-11ef-b642-0242ac110002 aborted\n\nThe above job failed w)
Client #53 19 of 50 0.51s --- ERROR (JobError('Job saq:job:default:28b692a2-03f2-11ef-ac6b-0242ac110002 aborted\n\nThe above job failed w)
Client #52 20 of 50 0.50s --- ERROR (JobError('Job saq:job:default:2950016c-03f2-11ef-b642-0242ac110002 aborted\n\nThe above job failed w)
Client #53 47 of 50 1.42s --- ERROR (JobError('Job saq:job:default:44d7a07a-03f2-11ef-ac6b-0242ac110002 aborted\n\nThe above job failed w)
...

It happens because worker being alive send a lot of BLMOVE (number which based on "concurrency"), so when the worker is killed, Redis continue hold/process BLMOVE, so when a new task is coming BLMOVE (from dead worker) moves the task from array "queued" to array "active", and no one changes status for task to "active". Then "sweeper" finds the task and reject it.

So it's how new tasks rejected after one of worker's death. Will be any improvements about it or it will be considered as 'normal' behavior?

tobymao commented 6 months ago

that's very helpful. for blmove, is there a way for redis to not hold process it or cancel the worker?

lega911 commented 6 months ago

Maybe to use one blmove for all task-processors (when one process receives a task, then second process send blmove). Or/and use blmove with timeout in loop, so Redis will drop it and dead worker doesn't refresh it.

By the way "one blmove" can make better balancing of tasks among workers and resolve original issue #114.

tobymao commented 6 months ago

how can the dead worker refresh it?

tobymao commented 6 months ago

it seems like this is all configurable.

you can set dequeue_timeout, right now it's 0 (block forever), but you can set that to 1 second or whatever you want.

you can also set blmove to be 1 by limiting concurrency to 1.