userver-framework / userver

Production-ready C++ Asynchronous Framework with rich functionality
https://userver.tech
Apache License 2.0
2.41k stars 280 forks source link

KafkaConsumer worker threads number. #714

Open melonaerial opened 1 week ago

melonaerial commented 1 week ago

Hello, guys! I've stared using your new kafka::ConsumerComponent in production. I need to read 3 topics in production and I've created 3 components for that. Here it is some description of them in static_config.yaml:

        kafka-consumer-number1:
            enable_auto_commit: false
            group_id: "topic_1_group_1"
            sasl_mechanisms: "SCRAM-SHA-512"
            security_protocol: "SASL_SSL"
            ssl_ca_location: $ssl-file-path
            auto_offset_reset: "smallest"
            topics:
                - "some_topic_1"
            max_batch_size: 10

        kafka-consumer-number2:
            ...
            topics:
                - "some_topic_2"

        kafka-consumer-number3:
            ...
            topics:
                - "some_topic_3"              

        ...
        consumer-task-processor:
            worker_threads: 2
            thread_name: consumer-worker

The reason why I use separate component for each topic is that I need to stop consumer if I've some error in processing message for one of the topics.

When I've 2 comsumer components everything was working fine. But when I've added 3rd one I've found that consumer lag is going up for one of topics. CPU is fine and don't see much coroutines running. I just see in logs that polling of messages for topics is going 1 time for 5-15 minutes.

I've changed number of consumer-task-processor.worker_threads from 2 to 4 and everything start working fine again. Does consumer-task-processor need number of worker_threads more than number of kafka::ConsumerComponent components ? And what it the reason for that?

fdr400 commented 1 week ago

Hello melonaerial! Thank you for your report Could you say, what version of userver you use?

If your version is less than v2.4 (currently, latest) -- that is the case! Because before the release, each consumer blocks OS thread and, as you mentioned, each kafka::ConsumerComponent required its own thread. Now we have a test, which checks whether two consumers can operate with one availabled thread: https://github.com/userver-framework/userver/blob/develop/kafka/tests/consumer_kafkatest.cpp#L79