mfontanini / cppkafka

Modern C++ Apache Kafka client library (wrapper for librdkafka)
BSD 2-Clause "Simplified" License
587 stars 207 forks source link

Cannot pull using consumer #286

Closed alphamarket closed 2 years ago

alphamarket commented 2 years ago

I am trying to create a prototype for produce and consume, I can produce but I cannot consume produced message. here is my code.

#include <iostream>
#include <cppkafka/cppkafka.h>

using namespace std;
using namespace cppkafka;

using namespace std::chrono_literals;

int main(int argc, char** argv) {
    // Create the config
    Configuration config = {
        {"group.id", "mygroup"},
        { "bootstrap.servers", "localhost:9092" },
    };
    const std::string
        part = (argc <= 1 ? "produce" : argv[1]),
        topic = "my_topic2";

    if(part == "produce") {
        std::cout << "Producing a message...\n" << std::flush;
        // Create the producer
        Producer producer(config);
        // Produce a message!
        string message = "hey there!";
        producer.produce(MessageBuilder(topic).payload(message));
        producer.flush();
        std::cout << "Produced message: " << message << std::endl;
    } else if(part == "consume") {
        std::cout << "Consuming messages...\n" << std::flush;
        // Construct from some config we've defined somewhere
        Consumer consumer(config);

        // Fetch all metadata
        Metadata metadata = consumer.get_metadata();

        cout << "Found " << metadata.get_brokers().size() << " brokers" << endl;
        cout << "Found " << metadata.get_topics().size() << " topics" << endl;
        for(auto& topic : metadata.get_topics()) {
            std::cout << "  Topic: " << topic.get_name() << endl
                      << "  Partition: " << topic.get_partitions().size() << " paritions!" << endl;
            std::cout << "  Has Error: " << (topic.get_error() ? "True" : "False") << endl
                      << "  " << std::string(40, '-') << endl;
        }

        // Subscribe to 2 topics
        consumer.subscribe({ topic });

        // Now loop forever polling for messages
        while (true) {
            Message msg = consumer.poll();

            // Make sure we have a message before processing it
            if (!msg) { continue; }

            std::cout << "Fetched a message\n";

            // Messages can contain error notifications rather than actual data
            if (msg.get_error()) {
                std::cerr << "[error] " << msg.get_error().to_string() << std::endl;
                // librdkafka provides an error indicating we've reached the
                // end of a partition every time we do so. Make sure it's not one
                // of those cases, as it's not really an error
                if (!msg.is_eof()) {
                    // Handle this somehow...
                }
                continue;
            }

            // We actually have a message. At this point you can check for the
            // message's key and payload via `Message::get_key` and
            // `Message::get_payload`

            cout << "Received message on partition " << msg.get_topic() << "/"
                 << msg.get_partition() << ", offset " << msg.get_offset() << endl
                 << "Message: " << msg.get_payload() << endl
                 << std::string(40, '-') << endl;
        }
    }
}
$ cmake .. && make -j4 && time ./kafka-example consume
Consuming messages...
Found 1 brokers
Found 2 topics
  Topic: my_topic2
  Partition: 1 paritions!
  Has Error: False
  ----------------------------------------
  Topic: __consumer_offsets
  Partition: 50 paritions!
  Has Error: False
  ----------------------------------------
<-- The program loops here and won't print anything -->

When I execute the program for consuming the produced messages, I get the following output but the program won't be able to read anything from consome.poll() and continues the loop from if (!msg) { continue; } line.

What is wrong with the implementation?

alphamarket commented 2 years ago

I need to mention I have installed the Kafka using docker in my system using the following docker-compose.yml file.

version: "3.9"
networks:
  kafka-net:
    driver: bridge

services:
  zookeeper-server:
    image: 'bitnami/zookeeper:latest'
    networks:
      - kafka-net
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka-server1:
    image: 'bitnami/kafka:latest'
    networks:
      - kafka-net    
    ports:
      - '9092:9092'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper-server
  kafka-server2:
    image: 'bitnami/kafka:latest'
    networks:
      - kafka-net    
    ports:
      - '9093:9092'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper-server