confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
284 stars 3.15k forks source link

rd_kafka_commit_transaction failed, but messages were delivered to topics #4558

Open tkolomiets opened 11 months ago

tkolomiets commented 11 months ago

Description

Pseudo c++ code

rd_kafka_conf_set(conf, "bootstrap.servers", "xxx.xxx.xxx.01:9092,xxx.xxx.xxx.02:9092,xxx.xxx.xxx.03:9092,xxx.xxx.xxx.04:9092,xxx.xxx.xxx.05:9092");
rd_kafka_conf_set(conf, "transactional.id", "librdkafka_transactions_example");
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
rd_kafka_init_transactions(producer, -1);

// q1: 3 partitions + 5 replicas
auto t1 = rd_kafka_topic_new(producer, "q1", nullptr);
// q2: 3 partitions + 5 replicas
auto t2 = rd_kafka_topic_new(producer, "q2", nullptr);
// q3: 3 partitions + 1 replicas
auto t3 = rd_kafka_topic_new(producer, "q3", nullptr);

rd_kafka_begin_transaction(producer);

std::string message = "hello";
auto res1 = rd_kafka_produce(t1, 0, RD_KAFKA_MSG_F_COPY, message.data(), message.length(), nullptr, 0, nullptr);
auto res2 = rd_kafka_produce(t2, 0, RD_KAFKA_MSG_F_COPY, message.data(), message.length(), nullptr, 0, nullptr);
auto res3 = rd_kafka_produce(t3, 0, RD_KAFKA_MSG_F_COPY, message.data(), message.length(), nullptr, 0, nullptr);
// res1 = 0, res2 = 0, res3 = 0

error = rd_kafka_commit_transaction(producer, 1000);
// error is not null
rd_kafka_error_txn_requires_abort(error) == false
// error log is: 
%3|1702461737.738|TXNERR|rdkafka#producer-1| [thrd:xxx.xxx.xxx.03:9092/bootstrap]: Current transaction failed in state BeginCommit: 1 message(s) timed out on q3 [0] (_TIMED_OUT, requires epoch bump)
FATAL ERROR: Failed to commit transaction: _TIMED_OUT: Failed to flush all outstanding messages within the API timeout: 1 message(s) remaining

After failed call of rd_kafka_commit_transaction q1 and q2 contain "hello" message, but q3 - not. I think it is mistake.. Failed commit, even retriable, should not to lead to message delivering to topics.

How to reproduce

Create 3 topics. First and second with 3 partitions and 5 replicas. Thirth - with 3 partitions and 1 replica. Try to run above code.

Checklist

emasab commented 8 months ago

Failed commits should be retried manually if rd_kafka_error_is_retriable(error) is true. Otherwise the commit won't be retried automatically.