When a function name of a consumer has an uppercase letter (eg processItems) then the consumer.concurrency property is ignored for this consumer.
@Bean public Function<KStream<String, String>, KStream<String, String>> processItems(...) {....}
Cause is an error in the AbstractKafkaStreamsBinderProcessor (spring-cloud-stream-binder-kafka-streams) on line 377:
ConfigurationPropertyName.of("spring.cloud.stream.bindings." + inboundName + ".consumer").isAncestorOf(name);
In the example inboundName will be "processItems" and this results in an error because ConfigurationPropertyName.of() can only process lowercase strings. And the result of this will be that the num-stream-theard property of the stream configuration on line 399 (streamConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, concurrency); is nog set.
When a function name of a consumer has an uppercase letter (eg processItems) then the consumer.concurrency property is ignored for this consumer.
@Bean public Function<KStream<String, String>, KStream<String, String>> processItems(...) {....}
spring: cloud: stream: bindings: processItems-in-0: consumer.concurrency: 4
Cause is an error in the AbstractKafkaStreamsBinderProcessor (spring-cloud-stream-binder-kafka-streams) on line 377:
ConfigurationPropertyName.of("spring.cloud.stream.bindings." + inboundName + ".consumer").isAncestorOf(name);
In the example inboundName will be "processItems" and this results in an error because ConfigurationPropertyName.of() can only process lowercase strings. And the result of this will be that the num-stream-theard property of the stream configuration on line 399 (streamConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, concurrency);
is nog set.