mfontanini / cppkafka

Modern C++ Apache Kafka client library (wrapper for librdkafka)
BSD 2-Clause "Simplified" License
600 stars 206 forks source link

librdkafka 1.0.0 and newer does not works for cppkafka consumer #230

Open olegabr opened 4 years ago

olegabr commented 4 years ago

The problem is that consumer not working when latest rdkafka lib version is installed. I've found that the last working rdkafka version is the 0.11.6. The next one is a major release change - 1.0.0.

This issue is related to another one: #183

0.11.6 version output

./main 
Message: 10 : ethercscan.morden.blocks:0:4

1.0.0 version output

# ./main 
Exception: Local: Timed out

cppkafka install

wget https://github.com/mfontanini/cppkafka/archive/v0.3.1.tar.gz \
        && tar -xzf v0.3.1.tar.gz \
        && rm v0.3.1.tar.gz \
        && cd cppkafka-0.3.1/ \
        && mkdir build \
        && cd build \
        && cmake -DCPPKAFKA_DISABLE_TESTS=ON -DCPPKAFKA_DISABLE_EXAMPLES=ON .. \
        && make \
        && sudo make install

rdkafka install

#!/bin/bash

#VERSION="0.11.6"
VERSION="1.0.0"
wget https://github.com/edenhill/librdkafka/archive/v${VERSION}.tar.gz \
    && tar -xzf v${VERSION}.tar.gz \
    && rm v${VERSION}.tar.gz \
    && cd librdkafka-${VERSION}/ \
    && ./configure \
    && make \
    && sudo make install

CMakeLists.txt

cmake_minimum_required(VERSION 3.10)
project(testt)
set(CMAKE_CXX_STANDARD 11)
include_directories("/usr/local/include/librdkafka")
include_directories("/usr/local/include/cppkafka")
FIND_LIBRARY(RDKAFKA rdkafka /usr/local/lib NO_DEFAULT_PATH)
FIND_LIBRARY(CPPKAFKA cppkafka /usr/local/lib NO_DEFAULT_PATH)
link_libraries(${RDKAFKA} ${CPPKAFKA})
add_executable(main main.cpp)
target_link_libraries(main cppkafka)
target_link_libraries(main rdkafka)

main.cpp

#include <cppkafka/consumer.h>
#include <thread>
#include <iostream>
#include <chrono>
#include "cppkafka/topic_partition_list.h"
#include "cppkafka/topic_partition.h"
#include "cppkafka/consumer.h"
#include "cppkafka/kafka_handle_base.h"

int main(void) { try {
        std::string topic = "openetherscan.morden.blocks";
        std::string groupid = "groupid1";
        cppkafka::Configuration configConsumer = {
                 { "group.id",             groupid.c_str() }
                ,{ "metadata.broker.list", "127.0.0.1:9092" }
                ,{ "enable.auto.commit",   false }
                ,{ "auto.offset.reset",    "earliest" }
        };
        cppkafka::Consumer consumer(configConsumer);

        uint16_t partition_num = 0;
        cppkafka::TopicPartition initialPartition(topic, partition_num);
        cppkafka::TopicPartitionList initialPL = consumer.get_offsets_committed(cppkafka::TopicPartitionList{initialPartition});
        cppkafka::TopicPartition tp(topic, partition_num, initialPL[0].get_offset()+1);
        cppkafka::TopicPartitionList topicPartitionList = {tp};

        consumer.assign(topicPartitionList);

        while (true) {
                cppkafka::Message msg = consumer.poll();
                if (msg) {
                        if (!msg.get_error()) {
                                std::cout << "Message: " << msg.get_key() << " : " << msg.get_topic() << ":" << msg.get_partition() << ":" << msg.get_offset() << std::endl;
                                topicPartitionList[0].set_offset(msg.get_offset());
                                consumer.commit(topicPartitionList);
                        } else {
                                if (!msg.is_eof()) {
                                        std::cout << "[+] Received error notification: " << msg.get_error() << std::endl;
                                }
                        }
                }
                std::this_thread::sleep_for(std::chrono::seconds(1));
        }
} catch (std::exception& ex) {
        std::cerr << "Exception: " << ex.what() << std::endl;
        return -1;
}
        std::cout << "No exceptions" << std::endl;
        return 0;
}
accelerated commented 4 years ago

We are using v1.1.0 or rdkafka and it works just fine. You may also want to consider using corokafka which can simplify a lot of the boilerplate code for you, thus reducing errors.

olegabr commented 4 years ago

I've checked the v1.1.0 rdkafka version and it fails the same way as v.1.0.0 do.

Note that producer works OK, only the consumer fails in both cases.

accelerated commented 4 years ago

Perhaps your kafka broker version is not compatible with the newer 1.1.0?

olegabr commented 4 years ago

My kafka version:

[2019-11-28 15:28:19,456] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.AppInfoParser)
olegabr commented 4 years ago

https://github.com/edenhill/librdkafka/releases/tag/v1.1.0 released Jul 5, 2019 https://kafka.apache.org/downloads#2.3.1 released Oct 24, 2019

So, I assume that kafka v2.3.0 should be OK for the v1.1.0 rdkafka lib.

accelerated commented 4 years ago

https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility

Yes it looks like it. But it may be that this page is not fully up to date. Best would be to ask on the librdkafka forum, cppkafka is a thin wrapper and in your case the issue is likely with librdkafka not being compatible with latest broker.... As a test, try downgrading the broker version to something like 1.0 and see. Also there are librdkafka settings like batch size, timeouts, etc...which could be slightly different for 2.3.1.

olegabr commented 4 years ago

cppkafka version: 0.3.1 rdkafka version: 1.0.0 kafka version: 1.0.2

result:

Exception: Local: Timed out

same result for kafka version 0.10.2.2 and 0.9.0.1

olegabr commented 4 years ago
librdkafka-1.0.0/examples$ ./rdkafka_consumer_example -g groupid3 -e -A ethercscan.morden.blocks:0

kafka v2.3.0

The used librdkafka example code: https://github.com/edenhill/librdkafka/blob/v1.0.0/examples/rdkafka_consumer_example.c

Output. A lot of messages like:

% Message (topic ethercscan.morden.blocks [0], offset 1488, 2048 bytes):
Key: 2978
{"jsonrpc":"2.0","result":{"author":"0x9fc6fefd7f33ca29ee17f2bfec944695e5f29caf","difficulty":"0x4ce8d","extraData":"0xd783010300844765746887676f312e352e31856c696e7578","gasLimit":"0x2fefd8","gasUsed":"0x0","hash":"0x42930b0f8e15acf8e8def5386d73da37cd8b95240a1dc07abc954a70dfa82d65","logsBloom":"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000^C0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x9fc6fefd7f33ca29ee17f2bfec944695e5f29caf","mixHash":"0x97ac49bf91eaa8130883f465bd1aea2878f578695e947c16a935ec6efd7e2426","nonce":"0x4455e04f96cb6757","number":"0xbaa","parentHash":"0x25da1fb89fd07b0fe0447cd6f89da7d67c7fa23956e2df191ec22e8b095f9a1b","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":["0xa097ac49bf91eaa8130883f465bd1aea2878f578695e947c16a935ec6efd7e2426","0x884455e04f96cb6757"],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x219","stateRoot":"0x8e728116756d5a705d46c6d99abfbb0b43357a73a074eda3d373e134d8e69b32","timestamp":"0x5631b92e","totalDifficulty":"0x27c73f63","transactions":[],"transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","uncles":[]},"id":1494}                                                                   

So, I assume that librdkafka works well. The problem is in cppkafka code.

accelerated commented 4 years ago

Do you know where the exception is thrown? In the assign partitions or during polling? You can try changing the consumer group name when you test with the new version of the client. I've seen some strange behaviors in the past with brokers getting stuck and changing the consumer group name helped.

olegabr commented 4 years ago

Looks like I've found the reason. In this example: https://github.com/mfontanini/cppkafka/blob/master/examples/consumer_example.cpp#L76 I've replaced the consumer.subscribe({ topic_name }); line with partition-aware initialization code:

        int partition = 0;
        cppkafka::TopicPartition initialPartition(topic_name, partition);
        cout << "consumer.get_offsets_committed" << endl;
        cppkafka::TopicPartitionList initialPL = consumer.get_offsets_committed(cppkafka::TopicPartitionList{initialPartition});
        cppkafka::TopicPartition tp(topic_name, partition, initialPL[0].get_offset() + 1);
        cppkafka::TopicPartitionList topicPartitionList_;
        topicPartitionList_.push_back(tp);
        cout << "consumer.assign" << endl;
        consumer.assign(topicPartitionList_);

The result is:

$ ./consumer -b 127.0.0.1:9092 -t ethercscan.morden.blocks -g groupid3
consumer.get_offsets_committed
terminate called after throwing an instance of 'cppkafka::HandleException'
  what():  Local: Timed out
Aborted (core dumped)

So, the problem is in the consumer.get_offsets_committed call.

Note that the original example with the consumer.subscribe({ topic_name }); code works just fine. And the partition-aware code works fine with old librdkafka versions.

My topics are created with a command:

/root/kafka_2.11-2.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic ethercscan.morden.blocks