Open ethanresnick opened 2 years ago
@ethanresnick thanks for the interest in the Pro version. Regarding performance what it means is that the performance is almost not affected by the number of groups, of course, the groups logic add a bit of extra work in the lua scripts, but the O complexity for picking the next job to process is at worst O(log n) as we use ZPOPMIN from redis to get the next group job. I will try to rewrite the text to be more precise on what is the complexity and added costs of using groups.
Particularly options such as priority and lifo will ignore the group settings, so we are actually planning to throw an exception if "incompatible" options are used together with groups.
Thanks @manast! This is very helpful. O(log n) is what I hoped for for picking the next group. I imagine that enqueuing and dequeuing with groups also just has some constant time overhead? I.e., it seems like there'd be work to do to move the job from the group into the main queue (when its time comes), and maybe some bookkeeping on enqueue? That's probably not relevant for my use case, but I'm just trying to make sure I have the correct mental model.
Besides that, I do have a couple more questions about rate limiting with groups:
What exactly is the rate limiting algorithm? From the docs, it sounds similar to, but not quite the same as, interleaved weighted round robin and weighted fair queueing.
For my use case, I'd like to be able to adjust a group's rate limit dynamically as the queue is running (e.g., to slow it down if the third-party services that my tasks interact with are failing or indicating that they can't keep up). Is that possible?
More generally, I'm kinda confused about what the rate limits apply to. My intuition was that these would be queue-level limits — i.e., if I set group x to have have a rate limit of 5/second, the queue should only dequeue up to 5 tasks in that group per second, regardless of how many workers I have or the concurrency of the workers. Am I understanding that right? Part of my confusion is that, in the docs, it looks like the limits are set at the worker level, but that seems a bit strange.
Finally, if a task in a rate-limited group fails, and is retried, the retry will count against the group's rate limit in the period where the task is retried, right?
Beyond rate limiting:
Is adding new groups at runtime supported? I have something like one group per user (simplifying a bit), and new users could come along as the queue is running.
Is there any reason why priority
/lifo
could not work with groups? If each group has it's own "virtual queue", I'd expect priority
/lifo
to simply control the position of the new job within that "virtual queue". I have one use case where this could be very useful. But, again, I don't actually know the implementation, so maybe that's impractical or I have the mental model wrong.
Note: edited my message above to consolidate my questions and add one more.
Please see my answers below.
Besides that, I do have a couple more questions about rate limiting with groups:
- What exactly is the rate limiting algorithm? From the docs, it sounds similar to, but not quite the same as, interleaved weighted round robin and weighted fair queueing.
Weighted round robin is not a rate limiter algorithm, but a scheduler algorithm. For rate limit we use just a counter per group and if the counter reaches its max setting for the time amount setting it rate limits the group until the key holding the counter expires.
- For my use case, I'd like to be able to adjust a group's rate limit dynamically as the queue is running (e.g., to slow it down if the third-party services that my tasks interact with are failing or indicating that they can't keep up). Is that possible?
That's currently not possible. It could be implemented though, it could be an additional API that you can use to manually rate limit a group for a certain amount of time.
- More generally, I'm kinda confused about what the rate limits apply to. My intuition was that these would be queue-level limits — i.e., if I set group x to have have a rate limit of 5/second, the queue should only dequeue up to 5 tasks in that group per second, regardless of how many workers I have or the concurrency of the workers. Am I understanding that right?
Yes, that's how it works, however, you cannot specify a different rate limit setting for every group, in your example, all groups would be rate-limited at 5 jobs/second. So if you for example have 100 groups, then the maximum throughput would be 500 jobs/second for the whole queue.
Part of my confusion is that, in the docs, it looks like the limits are set at the worker level, but that seems a bit strange. The workers are the ones that need to see if the rate limit has been reached or not.
- Finally, if a task in a rate-limited group fails, and is retried, the retry will count against the group's rate limit in the period where the task is retried, right?
If the task is retried it will be re-added to the group and processed until the group is not rate limited anymore.
Beyond rate limiting:
- Is adding new groups at runtime supported? I have something like one group per user (simplifying a bit), and new users could come along as the queue is running.
Yes, you just add the job with the groupId and then you have a new group. If all the jobs for a given groupId has been processed then that group does not consume any space anymore.
- Is there any reason why
priority
/lifo
could not work with groups? If each group has it's own "virtual queue", I'd expectpriority
/lifo
to simply control the position of the new job within that "virtual queue". I have one use case where this could be very useful. But, again, I don't actually know the implementation, so maybe that's impractical or I have the mental model wrong.
The priority/lifo are global settings, not group dependant, as such if you need this functionality you can just use a Queue without groups. Implementing priority inside groups is very very complex so this is not something we are planning to do. Lifo on the other hand is possible.
Hi @manast, thanks for your detailed answers! And apologies for my slow reply; I had to switch to working on another feature, but I'm now back to work on the feature for which I'd use BullMQ. A couple final questions:
For my use case, this ability to have a different rate limit per group, and the ability to temporarily lower the limit of a group at runtime, are really the critical requirements. From your answers, though, it sounds like these features wouldn't be that hard to add. If I signed up my company for BullMQ Pro, are these features you'd be open to implementing? What would the timeline be for something like that?
Does BullMQ Pro have any Redis APIs? (E.g., Redis functions or scripts for enqueueing, enqueuing to a group, marking a task complete, etc.) Even if not, I would still be able to use BullMQ Pro for my project. However, I do think having some core Bull APIs available to call from Lua would be a very, very valuable extension point!
For example, here's one use case that I've been thinking of: I have a task that, when it runs successfully, should publish some new tasks to a different queue. It seems to me that, if I enqueue these new tasks from the worker in JS, I'm getting at-least-once publishing semantics for those new tasks (i.e., the enqueue can succeed, but then marking the running task complete can fail, in which case these follow-on tasks could get enqueued again). On the other hand, if I could mark the main task as complete and enqueue the new tasks from inside a single Redis transaction, then it seems like I could have exactly-once semantics (analogous to this kafka feature)!
Implementing transactions like that is might be too marginal to include in BullMQ's core, but it's exactly the type of thing I could imagine adding myself if there were a Redis API with some stable "public API" guarantees, so that I wouldn't have to dig through the Bull internals to make sure my custom Redis code is properly updating all the Bull state (which seems hard to get right, and could break with each BullMQ upgrade).
@ethanresnick
- For my use case, this ability to have a different rate limit per group, and the ability to temporarily lower the limit of a group at runtime, are really the critical requirements. From your answers, though, it sounds like these features wouldn't be that hard to add. If I signed up my company for BullMQ Pro, are these features you'd be open to implementing? What would the timeline be for something like that?
Enabling a different rate limit per group is something that fits in the current design. Being able to dynamically change the rate limit of a given group is another story. Currently, when you enable rate limits for groups you do it when you instantiate a Worker. So all workers must have the same settings for consistent behavior. In theory, you could just close all workers and then instantiate new ones with a different setting, but I am not sure this will fulfill your requirements. If you signup for the Pro version we would prioritize the feature of different rate limits per group.
2. For example, here's one use case that I've been thinking of: I have a task that, when it runs successfully, should publish some new tasks to a different queue. It seems to me that, if I enqueue these new tasks from the worker in JS, I'm getting at-least-once publishing semantics for those new tasks (i.e., the enqueue can succeed, but then marking the running task complete can fail, in which case these follow-on tasks could get enqueued again). On the other hand, if I could mark the main task as complete and enqueue the new tasks from inside a single Redis transaction, then it seems like I could have exactly-once semantics (analogous to this kafka feature)!
This is currently achieved by using idempotence and user-generated job ids. Because if you add a job with the same id as an existing one it will be ignored. However, you need to make sure that you keep enough old jobs for this mechanism to work. I see the value in having an atomic "addAndComplete" operation, and this is something that we could also implement, however having access to the lua scripts from the client side is not a good idea because they are too low-level and we will need to handle a lot of issues from people using them directly.
Enabling a different rate limit per group is something that fits in the current design. That's awesome!
Being able to dynamically change the rate limit of a given group is another story. Currently, when you enable rate limits for groups you do it when you instantiate a Worker. So all workers must have the same settings for consistent behavior.
Hmm, ok. Let's say I could get away with not adjusting the rate limit dynamically for an existing group; I would still definitely need some way to add a new group at runtime, with a distinct limit. In the current version of Bull, a new group can get created transparently when I try to enqueue to it, but that only works because it doesn't need a custom rate limit.
This issue seems like it goes back to my question from earlier (i.e., trying to understand why the rate limits are set on the worker at all, when these limits seem like a property of the queue). Is the issue that the worker needs this state about the limits in memory for some reason?
If not, would it be easy to create a new API on the queue for setting up the limits? The initial groups and their limits (including a default limit for transparently-created new groups) could be set with a constructor option, like they are on Worker
/WorkerPro
today. But then maybe queues could also have an instance method for adding new groups (say, queue.configureGroup(name, opts)
)?
If you signup for the Pro version we would prioritize the feature of different rate limits per group.
Again, that's awesome! If we went this route, would you be able to give any kind of timeline? Or how does that normally work?
This is currently achieved by using idempotence and user-generated job ids. Because if you add a job with the same id as an existing one it will be ignored. However, you need to make sure that you keep enough old jobs for this mechanism to work.
Yes, I understand that (including the caveat about needing to keep enough completed jobs, which is an extra thing to think about, but is usually not too bad). I was really just giving the transactional publishing thing as an example. My bigger point was that there are probably a ton of feature requests that people could implement on their own, without having to fork Bull, if there were some sort of Redis api. Like, if I did want to implement some sort of custom scheduling, I could write that myself in Redis (which would be much easier and more reliable than writing it in Node), and then just call bull_enqueue
from Lua; that'd let me benefit from all the built-in retry and deduplication etc logic in Bull, while also customizing whatever I need to. You're probably right though that it'd just create a lot of extra maintenance work/issues :(
This issue seems like it goes back to my question from earlier (i.e., trying to understand why the rate limits are set on the worker at all, when these limits seem like a property of the queue). Is the issue that the worker needs this state about the limits in memory for some reason?
It is more complicated than it may seem at first sight. Since the queue is distributed, any change in a global queue setting would need to be propagated to all the workers and give some guarantees, there are many edge cases, it is really too complex for something that currently is not needed by the vast majority of users.
Yes, I understand that (including the caveat about needing to keep enough completed jobs, which is an extra thing to think about, but is usually not too bad). I was really just giving the transactional publishing thing as an example. My bigger point was that there are probably a ton of feature requests that people could implement on their own, without having to fork Bull, if there were some sort of Redis api. Like, if I did want to implement some sort of custom scheduling, I could write that myself in Redis (which would be much easier and more reliable than writing it in Node), and then just call
bull_enqueue
from Lua; that'd let me benefit from all the built-in retry and deduplication etc logic in Bull, while also customizing whatever I need to. You're probably right though that it'd just create a lot of extra maintenance work/issues :(
Redis does not support calling another lua script from inside lua. The other problem is that the lua code handling the queue is really quite complex, we put a lot of effort in figuring out all the edge cases and this is not something that the average coder would be able to do without investing a lot of work, specially if you are not already familiar with all the inner workings.
Btw, I am aware of Redis 7.0 functions, but we are not going to start using it in the short term.
It is more complicated than it may seem at first sight. Since the queue is distributed, any change in a global queue setting would need to be propagated to all the workers and give some guarantees, there are many edge cases, it is really too complex for something that currently is not needed by the vast majority of users.
Ok. I definitely understand that the workers need some knowledge of the state of the queue (e.g., its name/prefix, to know where to look for jobs), but I'm a bit surprised that they need knowledge of the rate limits. I'll take your word for it though.
In that case, do you have any advice then on how I might implement something like this? (I.e., something like different groups, with different rate limits that can be adjusted at runtime?)
Btw, I am aware of Redis 7.0 functions, but we are not going to start using it in the short term.
Yeah, Redis functions are what I was thinking of, or a redis module, which I see you explored in the past. But I can understand why neither of those are super appealing given that a lot of deployments don't support them yet.
Hello, I'm evaluating BullMQ Pro for my current project. It seems to have a lot of unique and powerful features, and I appreciate all the work you've put into it over so many years. I plan to request a trial of BullMQ Pro but, before I actually dive into the code and try to build a proof of concept, I was was hoping to understand the architecture a bit better.
In particular, I'm trying to understand how the groups are implemented at a high level, to get a sense of what kinds of problems I might run into. The docs say only that groups "[don't] have any impact on performance", but what does this mean exactly? Does it literally mean that they have no performance impact for absolutely all use cases and usage patterns? How is that possible?
I'm also trying to understand how groups interact with the other BullMQ ordering features. For example, if I add an job to a group and also specify a
priority
, or specifylifo: true
, how is that interpreted?Thanks for your help here!