vgarvardt / gue

Golang queue on top of PostgreSQL
MIT License
278 stars 23 forks source link

Specific workers per job type(s) #291

Open kydemy-fran opened 3 months ago

kydemy-fran commented 3 months ago

Hi, thank you for this awesome library. We have been using it for a while now, but we have new requirement in our projects that I am not sure we can implement with the current functionality.

Basically we have several services running, each one has its own job types and they all run in parallel in the same DB. But now we have a new service that has to have several job types, but the same queue type. And the job types are of different nature, some tasks are really fast <20ms others really slow >2 minutes.

Problem A

We need to have many workers for the quick tasks, and only a couple of workers for the slow task. Right now you can only specify num workers per pool/queue.

Problem B (the biggest one)

The quick tasks must be processed in near real-time fashion. The slow tasks can wait. If I create a workmap for these queue, and we have many slow tasks, the workers get stuck doing slow tasks, and the quick tasks are not resolved until all the slow tasks created before are processed. (this is when tasks fail, they are mainly priorized by run_at, and the priority is ignored... unfortunately we have a lot of failures due to unreliable 3rd party services) If we separate the tasks in different workmaps or pools of workers, because they share the same queue, we get a lot of job with unknown type as both workers try to pull and lock job types of the other type. (even if we handle the unknown type, we do not want for those tasks to be locked constantly until the right pool picks them up) This is also a problem if you have more job types in your queue than what your service understands. As you could have several services listening the same queue with workers/handlers for different job types. Even if you can ignore them, right now it fetches all jobs of any type, which is not ideal / optimal.

Solutions?

So, first of all, is there a way to achieve this with the current v5 version? (sorry if we have missed it)

If it is not possible, would you be ok for us to create a pull request with the following changes:

I believe with thes changes:

Let us know how we can proceed. Thank you!

vgarvardt commented 3 months ago

The easiest way for you would be to use different queues - one for slow and one for fast tasks. This way you would be able to scale workers in the pool for fast ant slow jobs. Queue is the entity for grouping similar tasks.

Another option could be task priorities - worker and pool have option WithWorkerPollStrategy/WithPoolPollStrategy so you can use PriorityPollStrategy and assign higher priority to the fast-running jobs and lower to the slow-running.

Another option could be to use WithWorkerUnknownJobWorkFunc/WithPoolUnknownJobWorkFunc worker/pool hooks to handle unknown job types, and I would combine it with the first approach - different queues. So that you do not need to change the logic for jobs enqueuing, but route slow tasks to another queue with own worker once the main worker that handles fast jobs finds them.

kydemy-fran commented 3 months ago

The easiest way for you would be to use different queues - one for slow and one for fast tasks. This way you would be able to scale workers in the pool for fast ant slow jobs. Queue is the entity for grouping similar tasks.

We already have (with service names) more than 10 queues. If we need to create new queue names by queue+job-groups is going to get very messy. Maybe is us, but it made sense for us to have queue names per use case or per service name. In these cases, you might have many job types that need different handling.

Another option could be task priorities - worker and pool have option WithWorkerPollStrategy/WithPoolPollStrategy so you can use PriorityPollStrategy and assign higher priority to the fast-running jobs and lower to the slow-running.

The problem here is with the failed jobs, I believe it always retries according to run_at and not priority.. any how, if our workers are all processing slow jobs, there will no be any available worker for quick jobs when they get registered.

Another option could be to use WithWorkerUnknownJobWorkFunc/WithPoolUnknownJobWorkFunc worker/pool hooks to handle unknown job types, and I would combine it with the first approach - different queues. So that you do not need to change the logic for jobs enqueuing, but route slow tasks to another queue with own worker once the main worker that handles fast jobs finds them.

This is what we were doing, but either it gets much more complex or there are many useless queries that get generated and locks to the table/rows that prevent other workers from picking up those jobs.

Please take a look to the PR and let me know if it is acceptable. (it is backwards compatible) Thank you!

vgarvardt commented 3 months ago

The problem here is with the failed jobs, I believe it always retries according to run_at and not priority

retry jobs are working in the same way as regular jobs - worker is not aware if the job failed previously, errors count and last error are available for caller only