mfontanini / cppkafka

Modern C++ Apache Kafka client library (wrapper for librdkafka)
BSD 2-Clause "Simplified" License
600 stars 206 forks source link

round-robin consumer group example ? #252

Open alagiz opened 4 years ago

alagiz commented 4 years ago

hello!

i have the following desired situation:

i couldn't achieve that, thus creating this issue

details:

code

consumer.cpp ```cpp #include #include #include #include #include #include "cppkafka/topic_partition_list.h" #include "cppkafka/topic_partition.h" #include "cppkafka/consumer.h" #include "cppkafka/kafka_handle_base.h" #include "cppkafka/utils/roundrobin_poll_strategy.h" using std::string; using std::exception; using std::cout; using std::endl; using cppkafka::Consumer; using cppkafka::Configuration; using cppkafka::Message; using cppkafka::TopicPartitionList; using cppkafka::RoundRobinPollStrategy; int main(void) { try { string topicName = "queueing.job.test"; string groupId = "testGroup"; // create consumer config Configuration configConsumer = { {"group.id", groupId.c_str()}, {"metadata.broker.list", "kafka:9092"}, {"enable.auto.commit", false}, {"auto.offset.reset", "earliest"} }; // create consumer Consumer consumer(configConsumer); // print the assigned partitions on assignment consumer.set_assignment_callback([](const TopicPartitionList &partitions) { cout << "Got assigned: " << partitions << endl; }); // print the revoked partitions on revocation consumer.set_revocation_callback([](const TopicPartitionList &partitions) { cout << "Got revoked: " << partitions << endl; }); // subscribe to the topic consumer.assign({topicName}); // or consumer.subscribe({topicName}); cout << "Consuming messages from topic " << topicName << endl; RoundRobinPollStrategy poll_strategy(consumer); // read lines and write them into kafka while (true) { // try to consume a message Message message = poll_strategy.poll(); if (message) { // if we managed to get a message if (message.get_error()) { // ignore EOF notifications from rdkafka if (!message.is_eof()) { cout << "[+] Received error notification: " << message.get_error() << endl; } } else { // print the key (if any) if (message.get_key()) { cout << message.get_key() << " -> "; } // print the payload cout << message.get_payload() << endl; // commit the message consumer.commit(message); } } } } catch (std::exception &ex) { std::cerr << "Exception: " << ex.what() << endl; return -1; } cout << "No exceptions" << endl; return 0; } ```

logs

@accelerated could you, perhaps, help me out with that one?

alagiz commented 4 years ago

alright, i know why it happens =>

when i increased the number of partitions to be 2, it works well

docker-compose.yml file ```yaml version: '3.2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka depends_on: - "zookeeper" environment: KAFKA_ADVERTISED_HOST_NAME: kafka KAFKA_ADVERTISED_PORT: "9092" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_HEAP_OPTS: -Xmx256m -Xms256m KAFKA_NUM_PARTITIONS: 2 volumes: - /var/run/docker.sock:/var/run/docker.sock ports: - "9092:9092" ```

although if i make number of partitions 3 while having 2 consumers, the following occurs

consumer-1_1  | Got assigned: [ queueing.job.test[0:#], queueing.job.test[1:#] ]
consumer-0_1  | Got assigned: [ queueing.job.test[2:#] ]

# send message
consumer-0_1  | {"userId":"jimmy","jobId":"jimmy_46d74824-27fb-4249-ab66-2dc42e1d455c","jobStep":0,"isJobDone":false}
# send message
consumer-1_1  | {"userId":"jimmy","jobId":"jimmy_83b0561a-05d0-499b-abf8-87fd1067ff8c","jobStep":0,"isJobDone":false}
# send message
consumer-1_1  | {"userId":"jimmy","jobId":"jimmy_b5816ece-34e2-4e26-a821-d2aa8e8a9d53","jobStep":0,"isJobDone":false}

i will be looking into how to manage dynamic amount of consumers to achieve round-robin message distribution.

should i adjust number of partitions when a new consumer is added/removed?