spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.63k stars 38.14k forks source link

DefaultMessageListenerContainer should be able to scale down using default config #32260

Closed apinske closed 8 months ago

apinske commented 9 months ago

Affects: 5.3.31

  1. Why are there two different code paths for the polling in the first place?
  2. Can you add the ability for scale-down to the executeOngoingLoop-Case as well?

Find a sample here: https://github.com/apinske/playground-mq/tree/scaling see also: https://github.com/spring-projects/spring-boot/issues/39533

jhoeller commented 8 months ago

The executeOngoingLoop code path is primarily there for the internal SimpleAsyncTaskExecutor which starts a new Thread for every (re-)scheduling attempt, reusing every Thread for as long as possible. Instead of frequent scaling up and down, the listener container just scales up as far as necessary, holding on to every Thread once created. As you noticed, scaling down only comes into effect once maxMessagesPerTask and/or idleReceivesPerTaskLimit have been set, with a concrete user-provided suggestion for when to consider a thread as idle enough to get rid of it. Otherwise, it is unclear what a good default for such a setting would be, and even which of the two settings would provide appropriate default behavior in the given load scenario against the given message broker.

Configured with an external ThreadPoolTaskExecutor, the default mode of operation is different: An effective default of maxMessagesPerTask=10 applies, frequently delegating back to the executor for rescheduling each consumer. This allows for fairer scheduling in the thread pool overall, giving other tasks a chance to get executed in-between. And with a min-max configuration for concurrency, this will also lead to frequent scaling up and down. Ironically the resulting behavior can be too dynamic for some scenarios, requiring a fine-tuning of the DMLC configuration for less active scheduling, e.g. through a higher maxMessagesPerTask value in case of quickly processed messages and through an idleReceivesPerTaskLimit setting for rescheduling idle threads specifically.

We may revisit this in 6.2 - however, from where we stand right now, it would primarily be documentation and guidance.

apinske commented 8 months ago

Thank you very much for clarifying! I hadn't consider the implications on thread management. We settled on setting idleReceivesPerTaskLimit to 2. With a receiveTimeout of 30s that means after 1 minute of no new messages we scale down and "lose" the threads. In conjunction with a JMS-Pool, that leads to the least amount of traffic and therefore work on the broker side. That works for us, but you are right, there probably is no good default.

Maybe another solution would be to configure a ThreadPoolTaskExecutor and propagate maxConcurrentConsumers to maxPoolSize. I like that fact that the threads are identifiable by the prefix (equal to the listener id) which could be preserved with that too. Would you consider that for a framework-default instead of the SimpleAsyncTaskExecutor? That should get rid of the separate code path altogether, since that is just concerned about thread creation.

Alternatively, would you consider amending the executeOngoingLoop code path with a scale-down, in a sense that the thread is kept alive but no more polling is executed (similar to the existing inactive wait state)?

I still feel that constant polling (massively parallel on an empty queue) in the interest of saving on later thread creation is a bad trade off by default.

jhoeller commented 8 months ago

On review along with #32252, there is actually an argument for scaling down in the default setup in particular with virtual threads. Since thread creation is cheap there, this would even work with a default SimpleAsyncTaskExector (#32252 simply uses an internal SimpleAsyncTaskExector with setVirtualThreads(true)). We just need to settle on some basic idle management settings.

The maxMessagesPerTask setting is not really relevant here since it primarily exists for fair scheduling in a shared thread pool, rescheduling even the busy consumer tasks rather frequently to give other tasks a chance to execute in-between. With internal thread management in the listener container, this does not serve any real purpose since the threads are exclusively donated to message processing anyway. The base listener invokers at the core concurrency level can stay around for the lifetime of the application even with dynamic scaling, and any busy invokers up to the maximum concurrency level just need to be scaled down if they are idle. With the default maximum concurrency of 1 and any fixed concurrency setting, the behavior will remain the same. Just in case of a different core versus maximum concurrency level, we'd apply some basic dynamic scaling along the line above.

So it looks like we'll actually revisit default scaling there in 6.2.

jhoeller commented 8 months ago

I've revised idleReceivesPerTaskLimit to apply to surplus consumers on a default/simple executor. Core consumers up until concurrentConsumers remain scheduled for any number of receives (since they would get rescheduled immediately anyway), whereas for surplus consumers up until maxConcurrentConsumers, the specified limit enforces a scale-down of those tasks.

The default limit of our revised idleReceivesPerTaskLimit setting is 10 on a default/simple executor now, for dynamic scaling of surplus consumers in case of a custom maxConcurrentConsumers value higher than concurrentConsumers: with the default receive timeout of 1 second, this means that each surplus task will be scaled down after 10 seconds of idle receives. With an externally specified thread pool, our traditionally inferred maxMessagesPerTask value of 10 still applies, leading to dynamic scaling of all consumers according to maxMessagesPerTask as well as a custom idleReceivesPerTaskLimit.

So all in all, most scenarios remain the same, with no change in defaults or semantics: that's the case for an externally specified thread pool with any concurrency boundaries, as well as for the default executor with a default concurrency level of 1. Just in case of dynamic concurrency boundaries (maxConcurrentConsumers higher than concurrentConsumers) for the default executor (or a similarly non-pooling SimpleAsyncTaskExecutor specified externally), the new idleReceivesPerTaskLimit default of 10 shows effect. This goes together nicely with the new virtualThreads flag on DefaultMessageListenerContainer in 6.2.