SGrondin / bottleneck

Job scheduler and rate limiter, supports Clustering
MIT License
1.83k stars 77 forks source link

Brainstorm: queue + limiter integration (Bottleneck v3 design discussion) #94

Open colinskow opened 5 years ago

colinskow commented 5 years ago

Earlier we had a conversation about integrating the limiter with a queue. I couldn't wrap my head around how exactly I would need it to work. I've finally figured that out.

Current problems:

Currently I use BeeQueue to schedule jobs and distribute them among workers. The workers then accept jobs with a specific maximum concurrency and route them through the appropriate limiters. This leads to several issues.

1) The worker's concurrency is blocked while a job is waiting to clear the limiter. 2) If the worker crashes or is taken down the jobs awaiting a limiter either fail (best case) or get stuck 3) There's no way to track via the queue how many jobs are executing verses waiting on the limiter

Ideal integration:

1) Jobs are scheduled via the queue as usual either for immediate execution or with a delay. A limiter and group key can optionally be specified with each job. 2) As soon as a job becomes active it is placed in the appropriate limiter queue, the main queue updates the status to awaiting_limiter. 3) Once a job has cleared the limiter, it can then be picked up by an available worker (which remains oblivious to the presence of a limiter). An active job is registered by the limiter's timer the moment it is picked up by a worker. Status is set to in_progress. 4) The worker reports on the success or failure of a job, which is reported via the main queue. 5) Ideally I could collect real-time statistics on how many jobs are waiting on a limiter and what the wait times are.

This would probably require a master worker to process all the limiters, but the administration could ideally be clustered across several workers for really large-scale projects.

I think the easiest way to accomplish this would probably be to fork BeeQueue, and the changes to BottleNeck should be minimal.

Thoughts anyone else?

SGrondin commented 5 years ago

I think the easiest way to accomplish this would probably be to fork BeeQueue, and the changes to BottleNeck should be minimal.

That's exactly where I landed after I spent time dissecting BeeQueue's source code 2 months ago. BQ is more than 5 or 10 times smaller and simpler than Bottleneck, and jobs have to go BQ -> Bottleneck, so it makes the most sense to modify BQ and make it aware of Bottleneck instead of the other way around.

Thankfully the BQ source code is small and simple to follow, so forking it to make it support exactly the integration you described is reasonable for a weekend project. I'd be happy to help by answering questions and discussing/planning architecture, if you'd like to get in touch with me. However I don't currently have the bandwidth to do the integration myself. If no one writes such integration this year, I'm probably going to recreate a slimmed down version of BQ myself and add it directly into Bottleneck sometime in the first half of 2019.

SGrondin commented 5 years ago

Update: I think I've figured out the right way forward. It would be Bottleneck v3 and it would address the current Clustering limitations without compromising the overall design.

Currently, the Cluster uses timestamps, counters, and timers to instruct clients (limiters) on how to proceed. The Cluster is aware of the queue length of each limiter, but nothing more.

Version 3 will have no significant impact on the non-Clustering usage other than a few changes to method and event names, etc. All the major changes will be in the Clustering mode and it will be backwards-compatible with version 2 behavior.

The Cluster is going to keep track of queued jobs. This will "unify" the queues and get rid of all the limitations listed here. Instead of each client having a queue, each limiter's jobs will instead hold a reference to the job's metadata located in Redis.

This approach probably seems like the obvious solution and some might be wondering why this wasn't chosen for v2. The reason is that the jobs themselves (the functions) cannot be serialized, and neither can complex arguments like functions and class-based objects. Most other distributed queued systems also require the user to setup a listener per client to handle each type of job. That design compromises the overall architecture and tends to lead to poorer user side code.

It's always been the top priority of Bottleneck that jobs can be added inline, without first having to register them. The second most important priority is to guarantee a distributed system where clients can safely crash, join, and leave at any time.

In v3, the jobs themselves still won't be serialized, instead, the user will have the option to provide a job type option as part of the job options.

Here's the v3 algorithm for a queued job, when it becomes ready to execute:

In other words, v3 will allow users to still run their jobs even if the originating limiter is lost. It will be opt-in, by passing an extra job option. By default, the behavior will be the same as v2 (jobs are lost).

tcbyrd commented 5 years ago

This all sounds great @SGrondin. At least as far as the projects I'm working on can be considered, this bit...

the user will have the option to provide a job type options

...is exactly what I'm looking for. We have separate queues for each job type that all have to share a common limiter, so each job is wrapped in the same limiter. We've avoided horizontally scaling the workers so far and luckily have enough headroom for the time being to vertically scale the worker, but that's probably not going to be the case forever.

Not sure if this is covered, but one limitation we currently have is our limiter settings are global, but within the app a given token will have different limits from other tokens. Example:

API Token A: 5000 req/hour API Token B: 7500 req/hour API Token C: 12500 req/hour

Maybe it makes sense to store this in the job metadata somehow? I can see this being useful for jobs that need to interact with multiple APIs as well, since you could have a single job that has to keep track of different limits. For example, within a single worker could GET 5000 req/hour from API A (e.g. GitHub), but only POST to API B at 5 req/second (e.g. Airtable). The job would need to be limited dynamically based on the state of both.