arnaud-lb / php-rdkafka

Production-ready, stable Kafka client for PHP
MIT License
2.09k stars 265 forks source link

INVALID_TXN_STATE returned from `abortTransaction()` #556

Open TimWolla opened 1 month ago

TimWolla commented 1 month ago

Description

The following code:

<?php
$conf = new \RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
$conf->set('metadata.broker.list', 'localhost:9092');
$conf->set('transactional.id', 'dummy');

$producer = new RdKafka\Producer($conf);

$topic = $producer->newTopic('myTopic');

$producer->initTransactions(3000);

for (;;) {
    $producer->beginTransaction();

    for ($i = 0; $i < 5; $i++) {
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    }

    $producer->commitTransaction(-1);

    $producer->beginTransaction();

    for ($i = 0; $i < 5; $i++) {
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    }

    $producer->abortTransaction(-1);
}

Resulted in this output (after roughly 15 seconds):

%1|1726655131.138|TXNERR|rdkafka#producer-1| [thrd:main]: Fatal transaction error: Failed to end transaction: Broker: Producer attempted a transactional operation in an invalid state (INVALID_TXN_STATE)
%3|1726655131.138|ERROR|rdkafka#producer-1| [thrd:main]: Fatal error: Broker: Producer attempted a transactional operation in an invalid state: Failed to end transaction: Broker: Producer attempted a transactional operation in an invalid state
PHP Fatal error:  Uncaught RdKafka\KafkaErrorException: INVALID_TXN_STATE in test.php:28
Stack trace:
#0 test.php(28): RdKafka\Producer->abortTransaction(-1)
#1 {main}
  thrown in test.php on line 28

But I expected this output instead:

Some rdkafka logs, but no exception.

My understanding is that I safely use the Rdkafka API, using a timeout of -1 to ensure that the commit or abort settles before I attempt to proceed with a new transaction.

I'm running this against:

docker run -it --rm -p 9092:9092 -e KAFKA_LOG4J_LOGGERS="kafka=DEBUG"  apache/kafka:latest

and the Kafka logs show:

[2024-09-18 10:25:31,024] DEBUG TransactionalId dummy prepare transition from PrepareCommit to TxnTransitMetadata(producerId=0, lastProducerId=0, producerEpoch=1, lastProducerEpoch=-1, txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(), txnStartTimestamp=1726655131023, txnLastUpdateTimestamp=1726655131024) (kafka.coordinator.transaction.TransactionMetadata)
[2024-09-18 10:25:31,024] DEBUG [TransactionCoordinator id=1] Returning CONCURRENT_TRANSACTIONS error code to client for dummy's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)
[2024-09-18 10:25:31,024] DEBUG [UnifiedLog partition=myTopic-0, dir=/tmp/kraft-combined-logs] First unstable offset updated to None (kafka.log.UnifiedLog)
[2024-09-18 10:25:31,024] DEBUG [Partition myTopic-0 broker=1] High watermark updated from (offset=10397, segment=[0:379449]) to (offset=10398, segment=[0:379527]) (kafka.cluster.Partition)
[2024-09-18 10:25:31,024] DEBUG [ReplicaManager broker=1] Produce to local log in 0 ms (kafka.server.ReplicaManager)
[2024-09-18 10:25:31,024] DEBUG [Transaction Marker Request Completion Handler 1]: Received WriteTxnMarker response ClientResponse(receivedTimeMs=1726655131024, latencyMs=0, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=WRITE_TXN_MARKERS, apiVersion=1, clientId=broker-1-txn-marker-sender, correlationId=1732, headerVersion=2), responseBody=WriteTxnMarkersResponseData(markers=[WritableTxnMarkerResult(producerId=0, topics=[WritableTxnMarkerTopicResult(name='myTopic', partitions=[WritableTxnMarkerPartitionResult(partitionIndex=0, errorCode=0)])])])) from node 1 with correlation id 1732 (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler)
[2024-09-18 10:25:31,024] DEBUG [Transaction Marker Channel Manager 1]: Sending dummy's transaction markers for TransactionMetadata(transactionalId=dummy, producerId=0, producerEpoch=1, txnTimeoutMs=60000, state=PrepareCommit, pendingState=Some(CompleteCommit), topicPartitions=HashSet(), txnStartTimestamp=1726655131023, txnLastUpdateTimestamp=1726655131024) with coordinator epoch 0 succeeded, trying to append complete transaction log now (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2024-09-18 10:25:31,025] DEBUG [Partition __transaction_state-46 broker=1] High watermark updated from (offset=5200, segment=[0:655188]) to (offset=5201, segment=[0:655302]) (kafka.cluster.Partition)
[2024-09-18 10:25:31,025] DEBUG [ReplicaManager broker=1] Produce to local log in 0 ms (kafka.server.ReplicaManager)
[2024-09-18 10:25:31,025] DEBUG TransactionalId dummy complete transition from PrepareCommit to TxnTransitMetadata(producerId=0, lastProducerId=0, producerEpoch=1, lastProducerEpoch=-1, txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(), txnStartTimestamp=1726655131023, txnLastUpdateTimestamp=1726655131024) (kafka.coordinator.transaction.TransactionMetadata)
[2024-09-18 10:25:31,025] DEBUG [Transaction State Manager 1]: Updating dummy's transaction state to TxnTransitMetadata(producerId=0, lastProducerId=0, producerEpoch=1, lastProducerEpoch=-1, txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(), txnStartTimestamp=1726655131023, txnLastUpdateTimestamp=1726655131024) with coordinator epoch 0 for dummy succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2024-09-18 10:25:31,025] DEBUG [TransactionCoordinator id=1] Aborting append of ABORT to transaction log with coordinator and returning CONCURRENT_TRANSACTIONS error to client for dummy's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2024-09-18 10:25:31,137] DEBUG [TransactionCoordinator id=1] TransactionalId: dummy's state is CompleteCommit, but received transaction marker result to send: ABORT (kafka.coordinator.transaction.TransactionCoordinator)
[2024-09-18 10:25:31,137] DEBUG [TransactionCoordinator id=1] Aborting append of ABORT to transaction log with coordinator and returning INVALID_TXN_STATE error to client for dummy's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)

It appears that Kafka returned a CONCURRENT_TRANSACTIONS error to Rdkafka, which then automatically retried the abort(). The next attempt then resulted in the INVALID_TXN_STATE.

I'm not sure of this is a bug in my test case, php-rdkafka, in librdkafka, or in Kafka itself. Given that I believe that my code is correct, I'm filing it here, because php-rdkafka is what I directly use.

Feel free to file an upstream issue, if you believe that php-rdkafka is doing everything correctly. Let me know if you need any further information.

php-rdkafka Version

6.0.3-1+ubuntu24.04.1+deb.sury.org+2

librdkafka Version

2.3.0-1build2

PHP Version

8.3.11-1+ubuntu24.04.1+deb.sury.org+1

Operating System

Ubuntu 24.04

Kafka Version

3.8.0

arnaud-lb commented 1 month ago

Thank you!

I was able to reproduce this in C. I've reported it here: https://github.com/confluentinc/librdkafka/issues/4849. I will leave this issue open until confirmation it's not a php-rdkafka issue.