mfontanini / cppkafka

Modern C++ Apache Kafka client library (wrapper for librdkafka)
BSD 2-Clause "Simplified" License
587 stars 207 forks source link

Parsing JSON payload with cppkafka #276

Open raghavendraramya opened 3 years ago

raghavendraramya commented 3 years ago

Hi, I am using cppkafka to have a Kafka consumer and connect to a Kafka server. The payload I receive is in a JSON format, so I am looking at converting it to a JSON object and then parse it. However, when I add in the code to create an object, I get the following error:

terminate called after throwing an instance of 'cppkafka::Exception' what(): Failed to create consumer handle: Failed to create thread: Success (0) Aborted (core dumped)

If I comment the JSON object creation, the code runs fine and I can read messages from the Kafka server. I am confused as to why the JSON object creation has any relationship to the error. The code I am using is attached. Lines 20 and 21, when commented, makes the code run fine and the consumer code is able to connect to a Kafka server and read the messages. When I uncomment the lines, I get the error above.

Main.txt.txt

I am thankful to any help in advance!

jlcordeiro commented 3 years ago

tested it with a random json parser I found on github and it works fine. I bet the problem is in your json.hpp file (which you didn't attach). I'd look there.

If you need further help just drop that file in here also and I can have a look.

raghavendraramya commented 3 years ago

Thank you for getting back to me on this. Which parser did you use? Could you kindly send me a link? json_hpp.txt

I am using the nlohmann JSON parser. Please see the attached json.hpp.

jlcordeiro commented 3 years ago

yeah you're using the library wrong. I'm even surprised it is compiling for you, so I'd suggest you look at your build process and see if something funky is going on.

Either way please see below a couple of changes that made your code work for me, and printing out the received events, in json.

diff bin/Main.txt.txt bin/main.cpp 
20,21c20,21
<   //json jsonObj;
<   //std::stringstream currentMessage >> jsonObj;
---
>   nlohmann::json jsonObj = nlohmann::json::parse(currentMessage);
>   std::cout << jsonObj.dump(4) << std::endl;

And here's a full example where I just removed unnecessary lines for brevity sake:

#include <stdexcept>
#include <iostream>
#include <csignal>
#include "json.hpp"
#include "cppkafka/consumer.h"
#include "cppkafka/configuration.h"

void ParseKafkaDetection(std::string currentMessage)
{
    nlohmann::json jsonObj = nlohmann::json::parse(currentMessage);
    std::cout << jsonObj.dump(4) << std::endl;
}

int main() {

    std::string brokers;
    std::string topic_name = "my-topic";
    std::string group_id;

    cppkafka::Configuration config = {
        { "metadata.broker.list", "localhost" },
        { "group.id", "kafka-consumer-test" },
        { "enable.auto.commit", false },
        { "session.timeout.ms", 60000 }
    };

    cppkafka::Consumer consumer(config);
    consumer.subscribe({ topic_name });

    while (true) {
        cppkafka::Message msg = consumer.poll();
        if (!msg) {
            continue;
        }
        // There's an error
        if (msg.get_error()) {
            if (!msg.is_eof()) {
                printf("Error\n");
            }
            continue;
        }
        ParseKafkaDetection(std::string(msg.get_payload()));    
    }
}