timgit / pg-boss

Queueing jobs in Node.js using PostgreSQL like a boss
MIT License
1.79k stars 149 forks source link

Global, per-queue concurrency limits #330

Open amc6 opened 2 years ago

amc6 commented 2 years ago

Is there a way to limit the total number of jobs running per queue across all workers? I'm still a little fuzzy on what teamSize and teamConcurrency does, but it appears that they configure concurrency per instance, not across all instances.

We have several use cases for this, but one of the simpler ones is to control how quickly we send various notifications such as text messages to users. We are rate limited by the provider we use and can't send a large volume all at once. So ideally I would be able to tune this to run only 1 or 2 send jobs at a time to keep us under the limit.

timgit commented 2 years ago

The short answer is no. Running a job from worker A doesn't prevent job fetching by worker B. There are a few options you can configure during send(), such as rate limiting, debouncing, and retries, that may help reduce the frequency of this happening, but if you end up with a backlog of jobs, you'd still be dealing with the distributed concurrency problem. In your handler, however, you could build your own distributed lock using postgres's pg_advisory_xact_lock() as a way to guarantee concurrency.

amc6 commented 2 years ago

Thanks! That's what I figured. We've been using pg_advisory_xact_lock but only in a very crude manner. Is there some way to use an advisory lock to allow n > 1 workers for a particular job at a time? The only thing I can think of is creating a separate reservation table with one row per allowed worker per job. And using something like select id from reservations limit 1 for update skip locked to grab a work reservation. Maybe that's not a crazy idea though.

timgit commented 2 years ago

If pg-boss were to start using advisory locks for the queue, it would fall into the same locking issues that SKIP LOCKED was created to avoid. There may be a way to pull this off via table constraints, however. Related discussion: https://github.com/timgit/pg-boss/discussions/334#discussioncomment-3109749

ilijaNL commented 1 year ago

Another approach, which I was thinking of without introducing a lock per queue is to penaltize the jobs with scheduling (aka startAfter). Say for example you only want to process 10 events per 5 seconds for queue A. when creating a job, you get amount of scheduled jobs for upcoming 5 seconds (state < active and startAfter < now() + interval 5s), if the amount exceeds the 10 jobs, you delay the jobs by some amount. This delaying could be evenly distributed with some "5s bucket per queue" method.

Any thoughts and disadvantages about this approach?

timgit commented 1 year ago

My original hesitation with advisory locks is that they will cause a problem if created during job fetch time. However, I think if you were to configure them to only be created once a job is fetched, this is completely viable as long as you keep the concurrency low enough to not cause too many concurrent transactional locks.

For example, if you had 5 workers, you could have a situation were jobs are fetched by all 5 simultaneously. When this occurs, you still only want them to run 1 at a time, so you could have all 5 request the advisory lock with a timeout, and as long as the lock request doesn't expire, they would run 1 at a time.

Antman261 commented 11 months ago

I think the other way to do this is to add another status and continue using SKIP LOCKED, but maintain an open transaction while the job is being processed.

You would essentially have a status between created and active, maybe ready where a worker then takes a lock on the job:

SELECT id
      FROM ${schema}.job j
      WHERE state = '${states.ready}'
        AND name LIKE $1
        AND startAfter < now()
ORDER BY priority desc, createdOn, id
      LIMIT $2
      FOR UPDATE SKIP LOCKED

Then when the job completes, you update the remaining created jobs in the queue to ready until you reach the global concurrency limit, e.g. if you set globalConcurrency to 3, then you can only ever have 3 ready jobs in the queue.

Even better if the caller can provide the client connection, so we can include our application updates within the same transaction. That way, if we process a job and either the job runner fails or the application code fails, both the job and the application database changes are rolled back together.

The trade off is that you're essentially sacrificing client connections to be open for as long as processing takes. E.g. a 5 second job will hold onto a connection from the pool for 5s while the job is processing. You would also want to use a LOCK TIMEOUT to ensure the job is unlocked if something goes wrong.