morganstanley / modern-cpp-kafka

A C++ API for Kafka clients (i.e. KafkaProducer, KafkaConsumer, AdminClient)
Apache License 2.0
331 stars 86 forks source link

Segv for message that cannot be sent on kafka #225

Open persona94 opened 8 months ago

persona94 commented 8 months ago

I have a server where my producer is not authorized to send a message on kafka. I use async send and the program crashes after the 1st send

My code to send

{
        std::unique_lock<std::mutex> lk(recordMapMutex);
        auto iter = recordMap.emplace(std::piecewise_construct,
            std::forward_as_tuple(data->messageId),
            std::forward_as_tuple(data->topic, kafka::Key{ data->key.c_str(), data->key.size() }, kafka::Value{ data->event.c_str(), data->event.size() }));
        rec = &iter.first->second;
        std::string prdIdStr = "producerId";
        rec->headers() = { { prdIdStr, kafka::Value(instanceId.c_str(), instanceId.size()) } };
}

Here, data is a class is passed in as a shared ptr. For now the shared pointer is pushed to a map and never removed, so the memory is valid even after the message is sent.

Here is the async send

    producer->send(
        *rec,
        [&, data, cb, rec](const RecordMetadata& metadata, const Error& error)
        {
            {
                // Disabled on purpose
                // std::unique_lock<std::mutex> lk(recordMapMutex);
                // recordMap.erase(data->messageId);
            }

            if (error)
            {
                std::cout < < "send hit error, calling CB" << std::endl;
                cb(data, metadata.toString(), error.toString());
            }
            else
            {
               std::cout << "No error" << std::endl;
                cb(data, metadata.toString(), "");
            }
        });

cb is a callback function that takes in a shared_ptrto the data that was passed in, and 2 strings

The error I get from kafka is

Error Broker: Topic authorization failed [29] sending message

Some logs from kafka

KafkaProducer[feae661e-a7d61a65] NOINFO | [thrd:main]: Topic test-topic metadata information unknown
KafkaProducer[feae661e-a7d61a65] NOINFO | [thrd:main]: Topic test-topic partition count is zero: should refresh metadata
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: Requesting metadata for 1/1 topics: refresh unavailable topics
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Request metadata for 1 topic(s): refresh unavailable topics
KafkaProducer[feae661e-a7d61a65] SEND | [thrd:ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Sent MetadataRequest (v4, 57 bytes @ 0, CorrId 3)
KafkaProducer[feae661e-a7d61a65] RECV | [thrd:ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Received MetadataResponse (v4, 284 bytes, CorrId 3, rtt 1.55ms)
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: ===== Received metadata (for 1 requested topics): refresh unavailable topics =====
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: ClusterId: MvZ7D3v5Qx2GezBHiz26Vg, ControllerId: 0
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: 3 brokers, 1 topics
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap:   Broker #0/3: kafka-kafka-0.kafka-kafka-brokers.kafka.svc.k8s-test:9093 NodeId 0
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap:   Broker #1/3: kafka-kafka-2.kafka-kafka-brokers.kafka.svc.k8s-test:9093 NodeId 2
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap:   Broker #2/3: kafka-kafka-1.kafka-kafka-brokers.kafka.svc.k8s-test:9093 NodeId 1
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap:   Topic #0/1: test-topic with 0 partitions: Broker: Topic authorization failed
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: Error in metadata reply for topic test-topic (PartCnt 0): Broker: Topic authorization failed
KafkaProducer[feae661e-a7d61a65] TOPICERROR | [thrd:main]: Topic test-topic has permanent error: Broker: Topic authorization failed
KafkaProducer[feae661e-a7d61a65] STATE | [thrd:main]: Topic test-topic changed state unknown -> error
KafkaProducer[feae661e-a7d61a65] PARTCNT | [thrd:main]: Failing all 1 unassigned messages in topic test-topic due to permanent topic error: Broker: Topic authorization failed
KafkaProducer[feae661e-a7d61a65] UAS | [thrd:main]: 0/1 messages were partitioned in topic test-topic
KafkaProducer[feae661e-a7d61a65] UAS | [thrd:main]: 1/1 messages failed partitioning in topic test-topic
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: 1/1 requested topic(s) seen in metadata

back trace from the core file

#0  0x0000000000f5a360 in rd_list_destroy ()
#1  0x0000000000f091b9 in rd_kafka_headers_destroy ()
#2  0x0000000000ea4fc4 in rd_kafka_produceva ()
#3  0x00000000008c0268 in kafka::clients::producer::KafkaProducer::send(kafka::clients::producer::ProducerRecord const&, std::function<void (kafka::clients::producer::RecordMetadata const&, kafka::Error const&)> const&, kafka::clients::producer::KafkaProducer::SendOption, kafka::clients::producer::KafkaProducer::ActionWhileQueueIsFull) (this=<optimized out>, record=..., deliveryCb=..., option=<optimized out>, action=<optimized out>) at /opt/rh/gcc-toolset-11/root/usr/include/c++/11/bits/stl_vector.h:918
persona94 commented 7 months ago

Hello, is anyone able to look into this?