AxonFramework / extension-kafka

Axon Framework extension for Kafka integration to publish and handle Event messages.
https://axoniq.io/
Apache License 2.0
66 stars 28 forks source link

Tracking Token Updated if Creating a Publisher Fails #454

Closed n3ziniuka5 closed 11 months ago

n3ziniuka5 commented 11 months ago

Basic information

I am setting up Axon to publish events to Kafka. I've tried various confirmation and event processor modes, and in all combinations I've managed to reproduce an issue when the tracking token is updated when Kafka is down, losing some events when Kafka is back up.

I am not fully up to speed with Axon internals, but one thing that jumps out in KafkaPublisher.send is that only Kafka commit happens under uow.onPrepareCommit. Other important steps, such as creation of the producer and sending of the message, both of which can throw exceptions, happen outside of it.

Steps to reproduce

I used the following spring boot configuration to publish events to Kafka:

axon:
  kafka:
    bootstrap-servers: localhost:9092
    client-id: axon-kafka-test
    default-topic: local.event
    properties:
      security.protocol: PLAINTEXT

    publisher:
      confirmation-mode: transactional

    producer:
      transaction-id-prefix: axon-kafka-test
      event-processor-mode: pooled-streaming

Steps to reproduce:

Expected behaviour

Kafka event processor's tracking token shouldn't be updated, and all events should eventually be published to Kafka when it recovers.

Actual behaviour

Kafka event processor's tracking token is updated even while Kafka is down.

gklijs commented 11 months ago

Hi Laurynas, thanks for reporting. I might have a look today. I do have one question, which token store did you use?

n3ziniuka5 commented 11 months ago

Hello Gerard, I appreciate you looking into this. I used MongoDB token store.

n3ziniuka5 commented 11 months ago

Here's how the tracking store is configured:

import com.mongodb.ReadConcern
import com.mongodb.ReadPreference
import com.mongodb.TransactionOptions
import com.mongodb.WriteConcern
import com.mongodb.client.MongoClient
import org.axonframework.common.transaction.TransactionManager
import org.axonframework.eventhandling.tokenstore.TokenStore
import org.axonframework.extensions.mongo.DefaultMongoTemplate
import org.axonframework.extensions.mongo.MongoTemplate
import org.axonframework.extensions.mongo.eventsourcing.tokenstore.MongoTokenStore
import org.axonframework.serialization.Serializer
import org.axonframework.spring.messaging.unitofwork.SpringTransactionManager
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.autoconfigure.mongo.MongoProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.mongodb.MongoDatabaseFactory
import org.springframework.data.mongodb.MongoTransactionManager

@Configuration
class AxonConfiguration {

  @Bean
  fun axonTemplate(client: MongoClient, mongoProperties: MongoProperties): MongoTemplate =
    DefaultMongoTemplate.builder()
      .mongoDatabase(client, mongoProperties.database)
      .build()

  @Bean
  fun axonTokenStore(axonTemplate: MongoTemplate,
                     serializer: Serializer,
                     axonTransactionManager: TransactionManager): TokenStore =
    MongoTokenStore.builder()
      .mongoTemplate(axonTemplate)
      .serializer(serializer)
      .transactionManager(axonTransactionManager)
      .build()

  @Bean
  fun axonTransactionManager(mongoDatabaseFactory: MongoDatabaseFactory): TransactionManager =
    SpringTransactionManager(MongoTransactionManager(mongoDatabaseFactory,
                                                     TransactionOptions.builder()
                                                       .readPreference(ReadPreference.primary())
                                                       .readConcern(ReadConcern.SNAPSHOT)
                                                       .writeConcern(WriteConcern.MAJORITY)
                                                       .build()))
}
gklijs commented 11 months ago

I was able to reproduce the issue, I'll dive into it further.

n3ziniuka5 commented 11 months ago

@gklijs unfortunately I am still experiencing an issue where publishing fails but token is updated. This time, I had an authentication issue:

2023-11-02T13:11:01.063+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-11-02T13:11:01.421+02:00  INFO 32758 --- [ad | axon-users] org.apache.kafka.clients.Metadata        : [Producer clientId=axon-users, transactionalId=axon-users0] Cluster ID: staging
2023-11-02T13:11:01.428+02:00  INFO 32758 --- [ad | axon-users] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] Discovered transaction coordinator localhost:9093 (id: 500198333 rack: null)
2023-11-02T13:11:01.876+02:00  INFO 32758 --- [ad | axon-users] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] ProducerId set to 1197 with epoch 0
2023-11-02T13:11:01.912+02:00  WARN 32758 --- [ad | axon-users] org.apache.kafka.clients.NetworkClient   : [Producer clientId=axon-users, transactionalId=axon-users0] Error while fetching metadata with correlation id 7 : {laurynas-test=TOPIC_AUTHORIZATION_FAILED}
2023-11-02T13:11:01.913+02:00 ERROR 32758 --- [ad | axon-users] org.apache.kafka.clients.Metadata        : [Producer clientId=axon-users, transactionalId=axon-users0] Topic authorization failed for topics [laurynas-test]
2023-11-02T13:11:01.913+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] Transiting to abortable error state due to org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [laurynas-test]
2023-11-02T13:11:01.914+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=axon-users, transactionalId=axon-users0] Aborting incomplete transaction
2023-11-02T13:11:01.915+02:00  WARN 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Error occurred. Starting retry mode.

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010) ~[kafka-clients-3.3.2.jar:na]
        at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:259) ~[kafka-clients-3.3.2.jar:na]
        at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116) ~[kafka-clients-3.3.2.jar:na]
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:258) ~[kafka-clients-3.3.2.jar:na]
        at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:792) ~[kafka-clients-3.3.2.jar:na]
        at org.axonframework.extensions.kafka.eventhandling.producer.DefaultProducerFactory$ProducerDecorator.commitTransaction(DefaultProducerFactory.java:259) ~[axon-kafka-4.9.0.jar:4.9.0]
        at org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher.tryCommit(KafkaPublisher.java:180) ~[axon-kafka-4.9.0.jar:4.9.0]
        at org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher.send(KafkaPublisher.java:160) ~[axon-kafka-4.9.0.jar:4.9.0]
        at org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher.handle(KafkaEventPublisher.java:80) ~[axon-kafka-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.SimpleEventHandlerInvoker.invokeHandlers(SimpleEventHandlerInvoker.java:128) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.SimpleEventHandlerInvoker.handle(SimpleEventHandlerInvoker.java:114) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.MultiEventHandlerInvoker.handle(MultiEventHandlerInvoker.java:91) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.AbstractEventProcessor.processMessageInUnitOfWork(AbstractEventProcessor.java:195) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$1(AbstractEventProcessor.java:173) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:57) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.messaging.interceptors.CorrelationDataInterceptor.handle(CorrelationDataInterceptor.java:67) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.TrackingEventProcessor.lambda$new$1(TrackingEventProcessor.java:181) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$2(AbstractEventProcessor.java:174) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$3(AbstractEventProcessor.java:170) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.AbstractEventProcessor.lambda$processInUnitOfWork$4(AbstractEventProcessor.java:166) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:165) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:491) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:316) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:1200) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.cleanUp(TrackingEventProcessor.java:1402) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1379) ~[axon-messaging-4.9.0.jar:4.9.0]
        at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [laurynas-test]

2023-11-02T13:11:01.917+02:00  WARN 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 1s
2023-11-02T13:11:01.919+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Released claim
2023-11-02T13:11:02.924+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Fetched token: IndexTrackingToken{globalIndex=0} for segment: Segment[0/0]
2023-11-02T13:11:02.966+02:00  WARN 32758 --- [ad | axon-users] org.apache.kafka.clients.NetworkClient   : [Producer clientId=axon-users, transactionalId=axon-users0] Error while fetching metadata with correlation id 8 : {laurynas-test=TOPIC_AUTHORIZATION_FAILED}
2023-11-02T13:11:02.966+02:00 ERROR 32758 --- [ad | axon-users] org.apache.kafka.clients.Metadata        : [Producer clientId=axon-users, transactionalId=axon-users0] Topic authorization failed for topics [laurynas-test]
2023-11-02T13:11:02.967+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] Transiting to abortable error state due to org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [laurynas-test]
2023-11-02T13:11:02.967+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=axon-users, transactionalId=axon-users0] Aborting incomplete transaction
2023-11-02T13:11:02.967+02:00  WARN 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 2s
2023-11-02T13:11:02.969+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Released claim
2023-11-02T13:11:04.971+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Fetched token: IndexTrackingToken{globalIndex=1} for segment: Segment[0/0]
2023-11-02T13:11:05.012+02:00  WARN 32758 --- [ad | axon-users] org.apache.kafka.clients.NetworkClient   : [Producer clientId=axon-users, transactionalId=axon-users0] Error while fetching metadata with correlation id 9 : {laurynas-test=TOPIC_AUTHORIZATION_FAILED}
2023-11-02T13:11:05.013+02:00 ERROR 32758 --- [ad | axon-users] org.apache.kafka.clients.Metadata        : [Producer clientId=axon-users, transactionalId=axon-users0] Topic authorization failed for topics [laurynas-test]
2023-11-02T13:11:05.013+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] Transiting to abortable error state due to org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [laurynas-test]
2023-11-02T13:11:05.013+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=axon-users, transactionalId=axon-users0] Aborting incomplete transaction
2023-11-02T13:11:05.013+02:00  WARN 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 4s
2023-11-02T13:11:05.015+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Released claim
2023-11-02T13:11:09.017+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Fetched token: IndexTrackingToken{globalIndex=2} for segment: Segment[0/0]
2023-11-02T13:11:09.056+02:00  WARN 32758 --- [ad | axon-users] org.apache.kafka.clients.NetworkClient   : [Producer clientId=axon-users, transactionalId=axon-users0] Error while fetching metadata with correlation id 10 : {laurynas-test=TOPIC_AUTHORIZATION_FAILED}
2023-11-02T13:11:09.057+02:00 ERROR 32758 --- [ad | axon-users] org.apache.kafka.clients.Metadata        : [Producer clientId=axon-users, transactionalId=axon-users0] Topic authorization failed for topics [laurynas-test]
2023-11-02T13:11:09.057+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] Transiting to abortable error state due to org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [laurynas-test]
2023-11-02T13:11:09.057+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=axon-users, transactionalId=axon-users0] Aborting incomplete transaction
2023-11-02T13:11:09.057+02:00  WARN 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 8s
2023-11-02T13:11:09.059+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Released claim
2023-11-02T13:11:17.061+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Fetched token: IndexTrackingToken{globalIndex=3} for segment: Segment[0/0]

As you can see, on each retry it kept fetching an incremented token - Fetched token: IndexTrackingToken{globalIndex=3}.

Once I fixed the authentication issue and restarted the app it didn't republish the failed events.

gklijs commented 11 months ago

Are you sure you are using 4.9.0 of the extension? To be sure before diving into this, as I don't understand how this would work with the new code. Although, from the line numbers, you do. Maybe the exception triggers after calling commitTransaction()? In this case, we should not support using Kafka transactions in this extension. What do you think?

Also, please create a new issue when there are new problems. Although related, this issue did solve some of the problems. So, instead of reopening this one, I rather have a new issue.

n3ziniuka5 commented 11 months ago

I am on 4.9.0. I've actually discovered a few additional issues:

Whether or not it makes sense to disable Kafka transactions I can't really say. I think conceptually it makes sense to have transactions because you should be able to rollback a transaction if anything else goes wrong to get exactly-once publishing. In our case, we wanted to guarantee exactly-once publishing because at the moment our consumers are not idempotent.

Sorry for not opening a separate issue, leaving a comment was just quicker. For the time being, we managed to work around the issues and are monitoring for errors in case we need to replay events manually.

gklijs commented 11 months ago

The problem is that it will never become exactly once because there are two different systems. If it's working now, its fine I guess.

smcvb commented 10 months ago

Hi @n3ziniuka5, just chipping in for this remark you made:

when I added the extension, it didn't publish any events. I am guessing it's related to https://github.com/AxonFramework/AxonFramework/pull/2778. It only started publishing events after I manually changed the token type in the event store from ReplayToken to GlobalSequenceTrackingToken and set the token position to 0.

This is indeed why handling didn't do anything...an unforeseen side effect of making that adjustment in Axon Framework. Right now, the KafkaEventPublisher of this extension, the component in charge of given the AF events to a Kafka Producer, is not allowed to handle events during a replay.

Although a small enhancement, it may be practical to provide a property to switch that behavior. Although, in general, the default to not publish old events to Kafka makes sense to me.

@gklijs, what are you thoughts on this pointer? It would of course merit a new issue, but it seemed fair to me to hold the discussion here, as @n3ziniuka5 already dropped it here :-)