aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.13k stars 227 forks source link

Unexpected exception in Transactional Producer #542

Open tvoinarovskyi opened 5 years ago

tvoinarovskyi commented 5 years ago

During processing I sometimes get UnknownError. Broker throws error like such:

[2019-08-16 11:47:51,481] ERROR [KafkaApi-1] Error when handling request: clientId=aiokafka-producer-1, correlationId=4641, api=ADD_PARTITIONS_TO_TXN, body={transactional_id=LOCAL-platform-event-preprocessor-tid-0,producer_id=0,producer_epoch=0,topics=[{topic=LOCAL-platform-event-clean,partitions=[2]}]} (kafka.server.KafkaApis)
java.lang.IllegalStateException: TransactionalId LOCAL-platform-event-preprocessor-tid-0 failed transition to state TxnTransitMetadata(producerId=0, producerEpoch=0, txnTimeoutMs=120000, txnState=Ongoing, topicPartitions=Set(LOCAL-platform-event-clean-2), txnStartTimestamp=1565956071468, txnLastUpdateTimestamp=1565956071468) due to unexpected metadata
        at kafka.coordinator.transaction.TransactionMetadata.throwStateTransitionFailure(TransactionMetadata.scala:391)
        at kafka.coordinator.transaction.TransactionMetadata.completeTransitionTo(TransactionMetadata.scala:326)
        at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$3(TransactionStateManager.scala:542)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
        at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
        at kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:534)
        at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:628)
        at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:628)
        at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
        at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
        at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
        at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:497)
        at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:622)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
        at kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:599)
        at kafka.coordinator.transaction.TransactionCoordinator.handleAddPartitionsToTransaction(TransactionCoordinator.scala:272)
        at kafka.server.KafkaApis.handleAddPartitionToTxnRequest(KafkaApis.scala:1752)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:130)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
        at java.lang.Thread.run(Thread.java:748)
tvoinarovskyi commented 5 years ago

See a similar problem on kafkajs https://github.com/tulios/kafkajs/issues/302

tvoinarovskyi commented 5 years ago

Seems to be related to clock instability. If the broker's clock happens to go back it may trigger this error.