morganstanley / modern-cpp-kafka

A C++ API for Kafka clients (i.e. KafkaProducer, KafkaConsumer, AdminClient)
Apache License 2.0
362 stars 90 forks source link

crashed when destroy headers #239

Open huth511 opened 3 months ago

huth511 commented 3 months ago

I created headers in a local function, and their values were created by dynamically allocating memory in the heap. Then, I store these pointers ( and I‘ve never freed them ):

        long n1;
        long n2;
        kafka::Headers headers;
        auto pN1Bytes = (decltype(n1)*)malloc(sizeof(n1));
        *pN1Bytes = n1;
        auto pN2Bytes = (decltype(n2)*)malloc(sizeof(n2));
        *pN2Bytes = n2;
        headers.push_back(kafka::Header(kafka::Header::Key("n1"), kafka::Header::Value(pN1Bytes, sizeof(n1))));
        headers.push_back(kafka::Header(kafka::Header::Key("n2"), kafka::Header::Value(pN2Bytes, sizeof(n2))));

but it occasionally crashes in rd_kafka_headers_destroy, here is the stack: 1724227362589

I sent msg using the following method, and this method is called sequentially:

bool KafkaProducerService::Produce(const kafka::Key& key, const std::string& msg, const std::string& topic, const kafka::Headers& headers)
{
    auto ret = true;
    try {
        auto record = kafka::clients::producer::ProducerRecord(topic, mPartition, key, kafka::Value(msg.c_str(), msg.size()));
        record.headers() = headers;
        mpProducer->send(
            record,
            mProduceCallback,
            kafka::clients::producer::KafkaProducer::SendOption::ToCopyRecordValue,
            kafka::clients::producer::KafkaProducer::ActionWhileQueueIsFull::NoBlock
        );
        mpProducer->flush(std::chrono::milliseconds(50));
    }
    catch (const kafka::KafkaException& e) {
        std::cerr << "% Unexpected exception caught: " << e.what() << std::endl;
        ret = false;
    }

    return ret;
}
huth511 commented 1 month ago

It was crashed when topic deleted (err: UNKNOWN TOPIC)