morganstanley / modern-cpp-kafka

A C++ API for Kafka clients (i.e. KafkaProducer, KafkaConsumer, AdminClient)
Apache License 2.0
331 stars 86 forks source link
kafka kafkaconsumer kafkaproducer librdkafka

About the Modern C++ Kafka API

Lifecycle Active

The modern-cpp-kafka API is a layer of C++ wrapper based on librdkafka (the C part only), with high quality, but more friendly to users.

KAFKA is a registered trademark of The Apache Software Foundation and
has been licensed for use by modern-cpp-kafka. modern-cpp-kafka has no
affiliation with and is not endorsed by The Apache Software Foundation.

Why it's here

The librdkafka is a robust high performance C/C++ library, widely used and well maintained.

Unfortunately, to maintain C++98 compatibility, the C++ interface of librdkafka is not quite object-oriented or user-friendly.

Since C++ is evolving quickly, we want to take advantage of new C++ features, thus making life easier for developers. And this led us to create a new C++ API for Kafka clients.

Eventually, we worked out the modern-cpp-kafka, -- a header-only library that uses idiomatic C++ features to provide a safe, efficient and easy to use way of producing and consuming Kafka messages.

Features

Installation / Requirements

User Manual

Properties

kafka::Properties Class Reference

Examples

1.

    std::string brokers = "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092";

    kafka::Properties props ({
        {"bootstrap.servers",  {brokers}},
        {"enable.idempotence", {"true"}},
    });

2.

    kafka::Properties props;
    props.put("bootstrap.servers", brokers);
    props.put("enable.idempotence", "true");

KafkaProducer

kafka::clients::producer::KafkaProducer Class Reference

A Simple Example

Here's a very simple example to see how to send a message with a KafkaProducer.

#include <kafka/KafkaProducer.h>

#include <cstdlib>
#include <iostream>
#include <string>

int main()
{
    using namespace kafka;
    using namespace kafka::clients::producer;

    // E.g. KAFKA_BROKER_LIST: "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092"
    const std::string brokers = getenv("KAFKA_BROKER_LIST"); // NOLINT
    const Topic topic = getenv("TOPIC_FOR_TEST");            // NOLINT

    // Prepare the configuration
    const Properties props({{"bootstrap.servers", brokers}});

    // Create a producer
    KafkaProducer producer(props);

    // Prepare a message
    std::cout << "Type message value and hit enter to produce message..." << std::endl;
    std::string line;
    std::getline(std::cin, line);

    ProducerRecord record(topic, NullKey, Value(line.c_str(), line.size()));

    // Prepare delivery callback
    auto deliveryCb = [](const RecordMetadata& metadata, const Error& error) {
        if (!error) {
            std::cout << "Message delivered: " << metadata.toString() << std::endl;
        } else {
            std::cerr << "Message failed to be delivered: " << error.message() << std::endl;
        }
    };

    // Send a message
    producer.send(record, deliveryCb);

    // Close the producer explicitly(or not, since RAII will take care of it)
    producer.close();
}

Notes

The Lifecycle of the Message

The message for the KafkaProducer is called ProducerRecord, it contains Topic, Partition (optional), Key and Value. Both Key & Value are const_buffer, and since there's no deep-copy for the Value, the user should make sure the memory block for the Value be valid, until the delivery callback has been executed.

In the previous example, we don't need to worry about the lifecycle of Value, since the content of the line keeps to be available before closing the producer, and all message delivery callbacks would be triggered before finishing closing the producer.

Example for shared_ptr

A trick is capturing the shared pointer (for the memory block of Value) in the message delivery callback.

    std::cout << "Type message value and hit enter to produce message... (empty line to quit)" << std::endl;

    // Get input lines and forward them to Kafka
    for (auto line = std::make_shared<std::string>();
         std::getline(std::cin, *line);
         line = std::make_shared<std::string>()) {

        // Empty line to quit
        if (line->empty()) break;

        // Prepare a message
        ProducerRecord record(topic, NullKey, Value(line->c_str(), line->size()));

        // Prepare delivery callback
        // Note: Here we capture the shared pointer of `line`, which holds the content for `record.value()`
        auto deliveryCb = [line](const RecordMetadata& metadata, const Error& error) {
            if (!error) {
                std::cout << "Message delivered: " << metadata.toString() << std::endl;
            } else {
                std::cerr << "Message failed to be delivered: " << error.message() << std::endl;
            }
        };

        // Send the message
        producer.send(record, deliveryCb);
    }

Example for deep-copy

The option KafkaProducer::SendOption::ToCopyRecordValue could be used for producer.send(...), thus the memory block of record.value() would be copied into the internal sending buffer.

    std::cout << "Type message value and hit enter to produce message... (empty line to quit)" << std::endl;

    // Get input lines and forward them to Kafka
    for (std::string line; std::getline(std::cin, line); ) {

        // Empty line to quit
        if (line.empty()) break;

        // Prepare a message
        ProducerRecord record(topic, NullKey, Value(line.c_str(), line.size()));

        // Prepare delivery callback
        auto deliveryCb = [](const RecordMetadata& metadata, const Error& error) {
            if (!error) {
                std::cout << "Message delivered: " << metadata.toString() << std::endl;
            } else {
                std::cerr << "Message failed to be delivered: " << error.message() << std::endl;
            }
        };

        // Send the message (deep-copy the payload)
        producer.send(record, deliveryCb, KafkaProducer::SendOption::ToCopyRecordValue);
    }

Embed More Info in a ProducerRecord

Besides the payload (i.e. value()), a ProducerRecord could also put extra info in its key() & headers().

Headers is a vector of Header which contains kafka::Header::Key (i.e. std::string) and kafka::Header::Value (i.e. const_buffer).

Example

    const kafka::Topic     topic     = "someTopic";
    const kafka::Partition partition = 0;

    const std::string key       = "some key";
    const std::string value     = "some payload";

    const std::string category  = "categoryA";
    const std::size_t sessionId = 1;

    {
        kafka::clients::producer::ProducerRecord record(topic,
                                                        partition,
                                                        kafka::Key{key.c_str(), key.size()},
                                                        kafka::Value{value.c_str(), value.size()});

        record.headers() = {{
            kafka::Header{kafka::Header::Key{"Category"},  kafka::Header::Value{category.c_str(), category.size()}},
            kafka::Header{kafka::Header::Key{"SessionId"}, kafka::Header::Value{&sessionId, sizeof(sessionId)}}
        }};

        std::cout << "ProducerRecord: " << record.toString() << std::endl;
    }

About enable.manual.events.poll

By default, KafkaProducer would be constructed with enable.manual.events.poll=false configuration. That means, a background thread would be created, which keeps polling the events (thus calls the message delivery callbacks)

Here we have another choice, -- using enable.manual.events.poll=true, thus the MessageDelivery callbacks would be called within member function pollEvents().

Example

    // Prepare the configuration (with "enable.manual.events.poll=true")
    const Properties props({{"bootstrap.servers",         {brokers}},
                            {"enable.manual.events.poll", {"true" }}});

    // Create a producer
    KafkaProducer producer(props);

    std::cout << "Type message value and hit enter to produce message... (empty line to finish)" << std::endl;

    // Get all input lines
    std::list<std::shared_ptr<std::string>> messages;
    for (auto line = std::make_shared<std::string>(); std::getline(std::cin, *line) && !line->empty();) {
        messages.emplace_back(line);
    }

    while (!messages.empty()) {
        // Pop out a message to be sent
        auto payload = messages.front();
        messages.pop_front();

        // Prepare the message
        ProducerRecord record(topic, NullKey, Value(payload->c_str(), payload->size()));

        // Prepare the delivery callback
        // Note: if fails, the message will be pushed back to the sending queue, and then retries later
        auto deliveryCb = [payload, &messages](const RecordMetadata& metadata, const Error& error) {
            if (!error) {
                std::cout << "Message delivered: " << metadata.toString() << std::endl;
            } else {
                std::cerr << "Message failed to be delivered: " << error.message() << ", will be retried later" << std::endl;
                messages.emplace_back(payload);
            }
        };

        // Send the message
        producer.send(record, deliveryCb);

        // Poll events (e.g. message delivery callback)
        producer.pollEvents(std::chrono::milliseconds(0));
    }

Error Handling

kafka::Error might occur at different places while sending a message,

Idempotent Producer

The enable.idempotence=true configuration is highly RECOMMENDED.

Example

        kafka::Properties props;
        props.put("bootstrap.servers", brokers);
        props.put("enable.idempotence", "true");

        // Create an idempotent producer
        kafka::clients::producer::KafkaProducer producer(props);

Kafka Consumer

kafka::clients::consumer::KafkaConsumer Class Reference

A Simple Example

#include <kafka/KafkaConsumer.h>

#include <cstdlib>
#include <iostream>
#include <signal.h>
#include <string>

std::atomic_bool running = {true};

void stopRunning(int sig) {
    if (sig != SIGINT) return;

    if (running) {
        running = false;
    } else {
        // Restore the signal handler, -- to avoid stuck with this handler
        signal(SIGINT, SIG_IGN); // NOLINT
    }
}

int main()
{
    using namespace kafka;
    using namespace kafka::clients::consumer;

    // Use Ctrl-C to terminate the program
    signal(SIGINT, stopRunning);    // NOLINT

    // E.g. KAFKA_BROKER_LIST: "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092"
    const std::string brokers = getenv("KAFKA_BROKER_LIST"); // NOLINT
    const Topic topic = getenv("TOPIC_FOR_TEST");            // NOLINT

    // Prepare the configuration
    const Properties props({{"bootstrap.servers", {brokers}}});

    // Create a consumer instance
    KafkaConsumer consumer(props);

    // Subscribe to topics
    consumer.subscribe({topic});

    while (running) {
        // Poll messages from Kafka brokers
        auto records = consumer.poll(std::chrono::milliseconds(100));

        for (const auto& record: records) {
            if (!record.error()) {
                std::cout << "Got a new message..." << std::endl;
                std::cout << "    Topic    : " << record.topic() << std::endl;
                std::cout << "    Partition: " << record.partition() << std::endl;
                std::cout << "    Offset   : " << record.offset() << std::endl;
                std::cout << "    Timestamp: " << record.timestamp().toString() << std::endl;
                std::cout << "    Headers  : " << toString(record.headers()) << std::endl;
                std::cout << "    Key   [" << record.key().toString() << "]" << std::endl;
                std::cout << "    Value [" << record.value().toString() << "]" << std::endl;
            } else {
                std::cerr << record.toString() << std::endl;
            }
        }
    }

    // No explicit close is needed, RAII will take care of it
    consumer.close();
}

Rebalance events

The KafkaConsumer could specify the RebalanceCallback while it subscribes the topics, and the callback will be triggered while partitions are assigned or revoked.

Example

    // The consumer would read all messages from the topic and then quit.

    // Prepare the configuration
    const Properties props({{"bootstrap.servers",    {brokers}},
                            // Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event
                            // whenever the consumer reaches the end of a partition.
                            {"enable.partition.eof", {"true"}},
                            // Action to take when there is no initial offset in offset store
                            // it means the consumer would read from the very beginning
                            {"auto.offset.reset",    {"earliest"}}});

    // Create a consumer instance
    KafkaConsumer consumer(props);

    // Prepare the rebalance callbacks
    std::atomic<std::size_t> assignedPartitions{};
    auto rebalanceCb = [&assignedPartitions](kafka::clients::consumer::RebalanceEventType et, const kafka::TopicPartitions& tps) {
                           if (et == kafka::clients::consumer::RebalanceEventType::PartitionsAssigned) {
                               assignedPartitions += tps.size();
                               std::cout << "Assigned partitions: " << kafka::toString(tps) << std::endl;
                           } else {
                               assignedPartitions -= tps.size();
                               std::cout << "Revoked partitions: " << kafka::toString(tps) << std::endl;
                           }
                       };

    // Subscribe to topics with rebalance callback
    consumer.subscribe({topic}, rebalanceCb);

    TopicPartitions finishedPartitions;
    while (finishedPartitions.size() != assignedPartitions.load()) {
        // Poll messages from Kafka brokers
        auto records = consumer.poll(std::chrono::milliseconds(100));

        for (const auto& record: records) {
            if (!record.error()) {
                std::cerr << record.toString() << std::endl;
            } else {
                if (record.error().value() == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
                    // Record the partition which has been reached the end
                    finishedPartitions.emplace(record.topic(), record.partition());
                } else {
                    std::cerr << record.toString() << std::endl;
                }
            }
        }
    }

To Commit Offset Manually

Once the KafkaConsumer is configured with enable.auto.commit=false, the user has to find out the right places to call commitSync(...)/commitAsync(...).

Example

    // Prepare the configuration
    Properties props({{"bootstrap.servers", {brokers}}});
    props.put("enable.auto.commit", "false");

    // Create a consumer instance
    KafkaConsumer consumer(props);

    // Subscribe to topics
    consumer.subscribe({topic});

    while (running) {
        auto records = consumer.poll(std::chrono::milliseconds(100));

        for (const auto& record: records) {
            std::cout << record.toString() << std::endl;
        }

        if (!records.empty()) {
            consumer.commitAsync();
        }
    }

    consumer.commitSync();

    // No explicit close is needed, RAII will take care of it
    // consumer.close();

Error Handling

Callbacks for KafkaClient

We're free to set callbacks in Properties with a kafka::clients::ErrorCallback, kafka::clients::LogCallback, or kafka::clients::StatsCallback.

Example

    // Prepare the configuration
    Properties props({{"bootstrap.servers", {brokers}}});

    // To print out the error
    props.put("error_cb", [](const kafka::Error& error) {
                              // https://en.wikipedia.org/wiki/ANSI_escape_code
                              std::cerr << "\033[1;31m" << "[" << kafka::utility::getCurrentTime() << "] ==> Met Error: " << "\033[0m";
                              std::cerr << "\033[4;35m" << error.toString() << "\033[0m" << std::endl;
                          });

    // To enable the debug-level log
    props.put("log_level", "7");
    props.put("debug", "all");
    props.put("log_cb", [](int /*level*/, const char* /*filename*/, int /*lineno*/, const char* msg) {
                            std::cout << "[" << kafka::utility::getCurrentTime() << "]" << msg << std::endl;
                        });

    // To enable the statistics dumping
    props.put("statistics.interval.ms", "1000");
    props.put("stats_cb", [](const std::string& jsonString) {
                              std::cout << "Statistics: " << jsonString << std::endl;
                          });

Thread Model

For Developers

Build (for tests/tools/examples)

Run Tests