mfontanini / cppkafka

Modern C++ Apache Kafka client library (wrapper for librdkafka)
BSD 2-Clause "Simplified" License
589 stars 207 forks source link

BufferedProducer loses messages if a consumer is running at the same time. #262

Open jlcordeiro opened 4 years ago

jlcordeiro commented 4 years ago

I've been struggling with an issue for a few weeks now and slowly I've been able to trim it down to a handful of lines of code. Whenever I use a BufferedProducer that produces events as a result of a consumer poll, events get lost quite often.

The producer reports the events as sent, no callbacks are triggered for failures, but the events never get delivered to any consumer. The sample below does it with async calls (and a separate thread doing a blocking flush) but I've been able to reproduce the problem even with sync_produce.

Reproduced both on 0.3.1 and latest version. Several linux distros. Latest rdkafka.

#include <iostream>
#include <memory>
#include <cppkafka/configuration.h>
#include <cppkafka/consumer.h>
#include <cppkafka/utils/buffered_producer.h>

const std::string broker_list = "localhost:9092";
const std::string ev_out = R"ev({"catch_me": "if you can"})ev";

const std::string topic_in  = "topic_in";
const std::string topic_out = "topic_out";

cppkafka::Configuration
consumer_configuration()
{
    cppkafka::Configuration config      = {
        {"metadata.broker.list", broker_list},
        {"group.id", "local-test-consumer"},
        {"enable.auto.commit", true},
        {"api.version.request", true},
    };

    cppkafka::TopicConfiguration topic_config = {
        {"auto.offset.reset", "latest"},
        {"request.required.acks", "all"},
    };
    config.set_default_topic_configuration(topic_config);
    return config;
}

cppkafka::Configuration
producer_configuration()
{
    return {
        {"metadata.broker.list", broker_list},
        {"api.version.request", true},
        {"queue.buffering.max.ms", 100}};
}

int
main()
{
    cppkafka::Consumer       consumer(consumer_configuration());
    consumer.subscribe({topic_in});

    cppkafka::BufferedProducer< std::string > producer{producer_configuration()};
    producer.set_max_number_retries(10);
    producer.set_produce_failure_callback([](const cppkafka::Message& msg) {
        std::cout << "Failed to write " << msg.get_payload() << std::endl;
        return true; // retry
    });

    std::thread([&]() {
        while(true)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            producer.flush();
        }
    }).detach();

    size_t sent_events_count = 0;
    while(1)
    {
        cppkafka::Message msg = consumer.poll();
        if(!msg || msg.get_error()) continue;

        const std::chrono::milliseconds timestamp{15};
        cppkafka::MessageBuilder        msg_builder{topic_out};

        for (int i = 0; i < 5; ++i)
        {
            msg_builder.payload(ev_out);
            msg_builder.timestamp(timestamp);
            producer.produce(msg_builder);
            ++sent_events_count;
        }

        std::cout << "Sent: " << sent_events_count << std::endl;
    }

    return 0;
}
pushkal-ag commented 3 years ago

@jlcordeiro @mfontanini Were you able to solve the issue?

jlcordeiro commented 3 years ago

Unfortunately not and I wasn't even able to get closer to the route of the issue at all. My next idea was to try to replicate straight on librdkafka but haven't had the time as I had something to release shortly after posting this and ended up quickly using a producer in node I had laying around.

Will definitely come back to it in a few weeks in the name of stubbornness if nothing else. Of course if someone else can at least reproduce it would be awesome to find out about it.

pushkal-ag commented 3 years ago

I used the cppkafka buffered producer and librdkafka complex consumer

as well as cppkafka buffered producer and Kafka-Python consumer

both seem to receive all the messages.

so i shortened down the issue to cppkafka-consumer. either some configuration is missing or there might be some bug in the consumer.

@mfontanini any idea would be helpful.

accelerated commented 3 years ago

@jlcordeiro have you registered delivery callbacks as well as all other callbacks which the BufferedProducer can accept? If not can you try and see...it may give you more tips on what's happening.

jlcordeiro commented 3 years ago

@pushkal-ag does that mean that you too were able to reproduce this?

@accelerated - yes I have. I get delivery acks on all events and no failure / error callbacks (both with and without retries).

will be running a few tests later today and I'll attach new replication steps if I do stumble upon anything useful.

pushkal-ag commented 3 years ago

@jlcordeiro Yes facing the same issue.

accelerated commented 3 years ago

Hi, have you figured out the issue? Just a quick glance, the code above seems to produce to topic_in and consume from topic_out...is this intended? Or did you just want to loopback your messages?

jlcordeiro commented 3 years ago

Unfortunately (?) I stopped being able to reproduce the issue. Same exact sample code, built in exactly the same way, has stopped missing events.

Thought about closing the issue but because I wasn't the only one reproducing it I left it open.

accelerated commented 3 years ago

Did anything change? It's very weird. We use BufferedProducer in production and have no issues at all. Same for consumers. You can also try CoroKafka and see if you can reproduce this. It wraps cppkakfa so your code will look very similar.

pushkal-ag commented 3 years ago

It got resolved for me. The issue was not related to kafka but something else in our pipeline. Thanks.

jlcordeiro commented 3 years ago

@accelerated it could be something changed on the broker, the network or something else out of my control. Hard to say. I still have the same code on production though and if the problem shows up again I will retest both with cppkafka and CoroKafka.

It's been weeks since I've last seen it though. I've even had tests running for days against local brokers but ofc it doesn't reflect fully the setup where I saw the issues where I am consuming / producing using a production broker over a VPN.