morganstanley / modern-cpp-kafka

A C++ API for Kafka clients (i.e. KafkaProducer, KafkaConsumer, AdminClient)
Apache License 2.0
331 stars 86 forks source link

heartbeat expiration when topic is idle #220

Open bergeraccso opened 10 months ago

bergeraccso commented 10 months ago

hi,

I'm not sure whether this is a bug in the library, an error of implementation on my part or simply intended behaviour. Im building a containerized demo app for c++. one container produces 10 messages and then shuts down. The other container is always online and consumes indefinitely.

consider this consumer:

Properties consumer_properties({
    {"bootstrap.servers", EnvConfig().kafka_config.brokers()},
    {"group.id", EnvConfig().kafka_config.group_id()},
    {"auto.offset.reset", EnvConfig().kafka_config.auto_offset_reset()}
    });

KConsumer::KConsumer(Topic _topic){
    KConsumer::topic = _topic;
};

void KConsumer::subscribe(std::function<void(std::string)> handle_message){
    // Use Ctrl-C to terminate the program
    signal(SIGINT, stopRunning);    // NOLINT

    //init consumer and assign partition
    KafkaConsumer consumer(consumer_properties);
    consumer.subscribe( {topic}, NullRebalanceCallback,std::chrono::milliseconds(30000));

    // consume loop
    while (running) {
        // Poll messages from Kafka brokers
        auto records = consumer.poll(std::chrono::milliseconds(100));
        /* 
            FIXME: if there are no new records incoming for a while the consumer will be kicked off of the consumer group due to missing heartbeats.
            Not sure if this is just a short-coming on the frameworks side or our implementation mistake.
            However, the consumer will resume after a rebalance and maybe this is an intended behaviour?
        */

        for (const auto& record: records) {
            if (!record.error()) {
                //TODO do this on debug/trace level
                std::cout << "Got a new message..." << std::endl;
                std::cout << "    Topic    : " << record.topic() << std::endl;
                std::cout << "    Partition: " << record.partition() << std::endl;
                std::cout << "    Offset   : " << record.offset() << std::endl;
                std::cout << "    Timestamp: " << record.timestamp().toString() << std::endl;
                std::cout << "    Headers  : " << toString(record.headers()) << std::endl;
                std::cout << "    Key   [" << record.key().toString() << "]" << std::endl;
                std::cout << "    Value [" << record.value().toString() << "]" << std::endl;
                handle_message(record.value().toString()); // evoke callback
            } else {
                std::cerr << record.toString() << std::endl;
            }
        }
        //consumer.commitSync(); // auto.commit is enabled by default
    }
}

now what will happen upon starting the producer and consumer. is that after the last message is consumed, the consumer will be kicked off of the consumer group after the heartbeat-timeout-intervall and a rebalancing is triggered.

container logs:

<...> kafka-plot-interface-utp-sink-connector-1 | [2023-08-22 08:44:06.845688]NOTICE KafkaConsumer[2d773c79-c6f4ba27] re-balance event triggered[ASSIGN_PARTITIONS], cooperative[disabled], topic-partitions[test-0] kafka-plot-interface-utp-sink-connector-1 | [2023-08-22 08:44:06.846237]NOTICE KafkaConsumer[2d773c79-c6f4ba27] subscribed, topics[test] <...> kafka-plot-interface-utp-sink-connector-1 | Got a new message... kafka-plot-interface-utp-sink-connector-1 | Topic : test kafka-plot-interface-utp-sink-connector-1 | Partition: 0 kafka-plot-interface-utp-sink-connector-1 | Offset : 75 kafka-plot-interface-utp-sink-connector-1 | Timestamp: CreateTime[2023-08-22 08:43:34.972] kafka-plot-interface-utp-sink-connector-1 | Headers : kafka-plot-interface-utp-sink-connector-1 | Key [[null]] kafka-plot-interface-utp-sink-connector-1 | Value [Hello, World 10] kafka-plot-interface-utp-sink-connector-1 | broadcasting message: Hello, World 10 kafka-plot-interface-utp-sink-connector-1 | message sent successfully

kafka-plot-interface-kafka-1 | [2023-08-22 08:44:51,793] INFO [GroupCoordinator 0]: Member ffb336d6-3f9b5c63-b26e474b-24ef-433e-84fb-a5e4e9468d26 in group utp_sink_connector has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) kafka-plot-interface-kafka-1 | [2023-08-22 08:44:51,794] INFO [GroupCoordinator 0]: Preparing to rebalance group utp_sink_connector in state PreparingRebalance with old generation 5 (consumer_offsets-37) (reason: removing member ffb336d6-3f9b5c63-b26e474b-24ef-433e-84fb-a5e4e9468d26 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator) kafka-plot-interface-utp-sink-connector-1 | [2023-08-22 08:44:51.851106]NOTICE KafkaConsumer[2d773c79-c6f4ba27] re-balance event triggered[REVOKE_PARTITIONS], cooperative[disabled], topic-partitions[test-0] kafka-plot-interface-kafka-1 | [2023-08-22 08:44:51,852] INFO [GroupCoordinator 0]: Stabilized group utp_sink_connector generation 6 (consumer_offsets-37) with 1 members (kafka.coordinator.group.GroupCoordinator) kafka-plot-interface-kafka-1 | [2023-08-22 08:44:51,857] INFO [GroupCoordinator 0]: Assignment received from leader 2d773c79-c6f4ba27-00c8b2f2-89bf-45ab-9441-bbb2c0586bdf for group utp_sink_connector for generation 6. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) kafka-plot-interface-utp-sink-connector-1 | [2023-08-22 08:44:51.858916]NOTICE KafkaConsumer[2d773c79-c6f4ba27] re-balance event triggered[ASSIGN_PARTITIONS], cooperative[disabled], topic-partitions[test-0]

so TLDR: I think consumers should send heartbeats in the background independed of their consumption loops. which they don't ?