Open spring-operator opened 7 years ago
Gary Russell commented
Currently, each consumer manages his own idle state; it is clearly stated in the documentation
Practically, consumers will only be stopped if the whole container is idle for some time. This is because the broker will share its work across all the active consumers.
Coordinating "idleness" across consumers would require a completely different algorithm that uses the entire container's message rate.
Yannick Boucher commented
@Gary
, I understand how the current implementation works and the limitations. I need to come up with a solution and I think my use case is common enough to consider making a contribution. What would be, in your opinion, the best way to work this out? Would it be to Improve SimpleMessageListenerContainer, start looking at #2259 and add scaling to DirectMessageListenerContainer or any other solution that I cannot think of?
Gary Russell commented
I don't really have time to give it much thought but, in either container, I think we'd have to keep some kind of smoothed moving average of message rates for the entire container (something like Spring Integration's ExponentialMovingAverageRate
).
Then, in each consumer thread check the current rate from time to time, compare it to the desired rate and adjust down if the rate has dropped below some threshold rate.
If you configure the desired rate at, say, 50 per second per consumer and we have 4 consumers and the rate drops below 150/s then stop one consumer.
I think we'd still need to check stopConsumerMinInterval
to avoid multiple consumers from stopping themselves at the same time.
Hi, is there any news on this ?
We are facing a similar behaviour: SimpleMessageListenerContainer creates way more consumers than needed. Instead, we would like the number of consumers to adjust according to the load. So I tried a few things.
Even if 1 consumer should handle all the load by itself, SimpleMessageListenerContainer quickly adds more and more consumers until it reaches maxConcurrentConsumers. => I tried limiting the number of consumers by counting busy consumers (consumers that have consecutiveMessages >= consecutiveActiveTrigger) and allowing up to twice the number of busy consumers (and up to maxConcurrentConsumers). That worked for limiting the number of created consumers.
When the load on the queue decreases, the number of consumers does not scale down. => I tried adding consumer priorities (first consumer would be set with a priority of 100, second consumer would be set with a priority of 99, and so on). The load was unbalanced between consumers, leaving some consumers with no messages at all so they were removed.
This solution adjusts the number of consumers to suit the load, however with a constant load the number of consumers fluctuates quite rapidly so a smoother adjustment would certainly be better.
Gary Russell's idea seems smoother but configuring a desired rate per consumer means knowing in advance how many messages can be handled by one consumer, what if the consumers suddenly need more time to process messages ?
I can contribute but I don't know what solution would be the best. Can you please advise ?
The logic there is like this:
private void checkAdjust(boolean receivedOk) {
if (receivedOk) {
if (isActive(this.consumer)) {
this.consecutiveIdles = 0;
if (this.consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
considerAddingAConsumer();
this.consecutiveMessages = 0;
}
}
}
else {
this.consecutiveMessages = 0;
if (this.consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
considerStoppingAConsumer(this.consumer);
this.consecutiveIdles = 0;
}
}
}
So, if the current consumer receives the message we start a new consumer, but only if the consecutiveMessages
does not exceed that consecutiveActiveTrigger
.
Otherwise we stop it when its consecutiveIdles
exceeds the consecutiveIdleTrigger
.
Does not those consecutiveActiveTrigger
& consecutiveIdleTrigger
setting to 1
help?
/**
* If {@link #maxConcurrentConsumers} is greater then {@link #concurrentConsumers}, and
* {@link #maxConcurrentConsumers} has not been reached, specifies the number of
* consecutive cycles when a single consumer was active, in order to consider
* starting a new consumer. If the consumer goes idle for one cycle, the counter is reset.
* This is impacted by the {@link #batchSize}.
* Default is 10 consecutive messages.
* @param consecutiveActiveTrigger The number of consecutive receives to trigger a new consumer.
* @see #setMaxConcurrentConsumers(int)
* @see #setStartConsumerMinInterval(long)
* @see #setBatchSize(int)
*/
public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger) {
Assert.isTrue(consecutiveActiveTrigger > 0, "'consecutiveActiveTrigger' must be > 0");
this.consecutiveActiveTrigger = consecutiveActiveTrigger;
}
/**
* If {@link #maxConcurrentConsumers} is greater then {@link #concurrentConsumers}, and
* the number of consumers exceeds {@link #concurrentConsumers}, specifies the
* number of consecutive receive attempts that return no data; after which we consider
* stopping a consumer. The idle time is effectively
* {@link #receiveTimeout} * {@link #batchSize} * this value because the consumer thread waits for
* a message for up to {@link #receiveTimeout} up to {@link #batchSize} times.
* Default is 10 consecutive idles.
* @param consecutiveIdleTrigger The number of consecutive timeouts to trigger stopping a consumer.
* @see #setMaxConcurrentConsumers(int)
* @see #setStopConsumerMinInterval(long)
* @see #setReceiveTimeout(long)
* @see #setBatchSize(int)
*/
public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger) {
Assert.isTrue(consecutiveIdleTrigger > 0, "'consecutiveIdleTrigger' must be > 0");
this.consecutiveIdleTrigger = consecutiveIdleTrigger;
}
Another piece of logic is like this:
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
checkAdjust(receivedOk);
}
So, if this maxConcurrentConsumers
is not set, then on new dynamic consumers are created.
Thank you for your answer.
I tried setting consecutiveIdleTrigger
to 1 and setting consecutiveActiveTrigger
to 10000, it created consumers at a much slower pace, however the number of consumers still reached maxConcurrentConsumers
and never decreased.
In my case, setting consecutiveIdleTrigger
to 1 and receiveTimeout
to 5 made the number of consumers automatically adjust to the workload.
Why is receiveTimeout
set to 1000 by default ? is it safe to set it to a much lower value ?
Well, the behavior with the maxConcurrentConsumers
is correct. It point is to increase to that number and keep them.
That's something similar to the concurrentConsumers
where all of them are started eagerly immediately.
The receiveTimeout
is used to block a consumer for polling from the in-memory queue in the BlockingQueueConsumer
:
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
So, it really might be safe to set it into a smaller period, but not by default.
We don't see a problem having those concurrent consumers on the queue if they are idling. It does not effect RabbitMQ performance. Or do I miss something?
Thank you for sharing your experience!
Each consumer uses its own channel and the documentation recommends to limit the number of channels:
Each channel also consumes a relatively small amount of memory on the node the client is connected to, plus a few Erlang processes. Since a node usually serves multiple channel connections, the effects of excessive channel usage or channel leaks will primarily be reflected in RabbitMQ nodes' metrics and not those of clients.
Given both of these factors, limiting the number of channels used per connection is highly recommended. As a guideline, most applications can use a single digit number of channels per connection. Those with particularly high concurrency rates (usually such applications are consumers) can start with one channel per thread/process/coroutine and switch to channel pooling when metrics suggest that the original model is no longer sustainable, e.g. because it consumes too much memory.
We are trying to reduce the number of consumers to reduce RabbitMQ memory usage.
--
I tested again with consecutiveIdleTrigger
set to 1 and receiveTimeout
set to 5, however the number of consumers oscillates even with a constant workload. In fact I see some channel churn occuring, that could impact RabbitMQ's performance as we currently have thousands of channels. Maybe increasing stopConsumerMinInterval
could help lowering this channel churn
The number of opened channels can be limited by the CachingConnectionFactory
, which is 25
by default.
While with some workloads this is a natural state of the system, long lived channels should be used instead when possible.
And that's exactly what is going on with that CachingConnectionFactory
: it does keep only that number of channels per connection:
/**
* Sets the channel checkout timeout. When greater than 0, enables channel limiting
* in that the {@link #channelCacheSize} becomes the total number of available channels per
* connection rather than a simple cache size. Note that changing the {@link #channelCacheSize}
* does not affect the limit on existing connection(s), invoke {@link #destroy()} to cause a
* new connection to be created with the new limit.
* <p>
* Since 1.5.5, also applies to getting a connection when the cache mode is CONNECTION.
* @param channelCheckoutTimeout the timeout in milliseconds; default 0 (channel limiting not enabled).
* @since 1.4.2
* @see #setConnectionLimit(int)
*/
public void setChannelCheckoutTimeout(long channelCheckoutTimeout) {
I'm sorry for saying that, but Gary Russell is not with our team any more and I'm not sure (yet) what exactly you would like to see as a fix for your requirement. I'm opened for any suggestions, but for now I don't see a way how to make it smart enough for scaling down when load is lower.
Might be this part can be improved somehow with an else
:
if (isActive(this.consumer)) {
this.consecutiveIdles = 0;
if (this.consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
considerAddingAConsumer();
this.consecutiveMessages = 0;
}
}
I mean if consecutiveMessages
is bigger than consecutiveActiveTrigger
we add consumer, otherwise considerStoppingAConsumer()
.
WDYT?
Yannick opened AMQP-745 and commented
This is somehow related to #2259
Unless we set 'consecutiveIdleTrigger' to 1 and an ridiculously low receiveTimeout, it's very hard to have consumers removed from a queue having reasonable amount of activity given the current SimpleMessageListenerContainer implementation.
For instance I have a queue with 100 msg/s and a consumer handling 1 msg each millisecond. This means that one consumer should easily be able to sustain the load all by itself. Now even if I set 'consecutiveIdleTrigger' to 1, I could theoretically have 100 consumers and none of them would be removed. That make it 1000 If I were to use the default 'consecutiveIdleTrigger' value of 10.
2259 sounds like a very good way to solve the problem but more short term I think having a base class with template and/or overridable methods would do the trick for us to hook and inject a more aggressive scale down strategy. I can contribute something I just don't know exactly which solution is more likely to be merged upstream. Please advise.
Affects: 1.6.8
Reference URL: #2259