confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
303 stars 3.15k forks source link

rd_kafka_abort_transaction: Failed to end transaction: Broker: Producer attempted a transactional operation in an invalid state #4849

Open arnaud-lb opened 2 months ago

arnaud-lb commented 2 months ago

Description

rd_kafka_abort_transaction() can sometimes fail with INVALID_TXN_STATE after a successful rd_kafka_transactions_init() and rd_kafka_begin_transaction().

How to reproduce

Reproducer:

#include <stdlib.h>
#include <string.h>
#include <sys/syslog.h>
#include <librdkafka/rdkafka.h>

void setconf(rd_kafka_conf_t *conf, const char *name, const char *value) {
    char errstr[256];
    if (rd_kafka_conf_set(conf, name, value, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "rd_kafka_conf_set: %s\n", errstr);
        exit(1);
    }
}

void produce(rd_kafka_topic_t *rkt) {
    for (int i = 0; i < 5; i++) {
        char payload[128];
        snprintf(payload, sizeof(payload), "Message %d", i);
        int ret = rd_kafka_produce(rkt,
                RD_KAFKA_PARTITION_UA,
                RD_KAFKA_MSG_F_COPY,
                payload, strlen(payload),
                NULL, 0,
                NULL);
        if (ret != 0) {
            fprintf(stderr, "rd_kafka_produce: %d\n", ret);
            exit(1);
        }
    }
}

void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
{
}

int main(void) {
    char errstr[256];
    rd_kafka_error_t *err;

    rd_kafka_conf_t *conf = rd_kafka_conf_new();

    setconf(conf, "log_level", "7" /* LOG_DEBUG */);
    setconf(conf, "debug", "all");
    setconf(conf, "metadata.broker.list", "localhost:9092");
    setconf(conf, "transactional.id", "dummy");

    rd_kafka_conf_set_opaque(conf, (void*)123);
    rd_kafka_conf_set_dr_msg_cb(conf, kafka_conf_dr_msg_cb);

    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!rk) {
        fprintf(stderr, "rd_kafka_new: %s\n", errstr);
        exit(1);
    }

    rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, "myTopic", NULL);
    if (!rkt) {
        fprintf(stderr, "rd_kafka_topic_new: unkonwn error\n");
        exit(1);
    }

    err = rd_kafka_init_transactions(rk, 3000);
    if (err != NULL) {
        fprintf(stderr, "rd_kafka_init_transactions: %s\n", rd_kafka_error_string(err));
        exit(1);
    }

    // fprintf(stderr, "Transactions initialized\n");

    for (;;) {
        err = rd_kafka_begin_transaction(rk);
        if (err != NULL) {
            fprintf(stderr, "rd_kafka_begin_transaction: %s\n", rd_kafka_error_string(err));
            exit(1);
        }

        produce(rkt);
        // rd_kafka_poll(rk, 0);

        err = rd_kafka_commit_transaction(rk, -1);
        if (err != NULL) {
            fprintf(stderr, "rd_kafka_commit_transaction: %s\n", rd_kafka_error_string(err));
            exit(1);
        }

        // rd_kafka_poll(rk, 0);

        err = rd_kafka_begin_transaction(rk);
        if (err != NULL) {
            fprintf(stderr, "rd_kafka_begin_transaction: %s\n", rd_kafka_error_string(err));
            exit(1);
        }

        produce(rkt);
        // rd_kafka_poll(rk, 0);

        err = rd_kafka_abort_transaction(rk, -1);
        if (err != NULL) {
            fprintf(stderr, "rd_kafka_abort_transaction: %s\n", rd_kafka_error_string(err));
            exit(1);
        }

        rd_kafka_poll(rk, 0);
    }

    return 0;
}
$ docker run -it --rm -p 9092:9092 -e KAFKA_LOG4J_LOGGERS="kafka=DEBUG"  apache/kafka:3.8.0
$ cc -o bug bug.c -lrdkafka
$ ./bug >log 2>&1

On my machine the program terminates after less than 10 seconds. If it doesn't, restarting it and rebuilding librdkafka in the background to generate noise helps.

Checklist