micronaut-projects / micronaut-kafka

Integration between Micronaut and Apache Kafka
Apache License 2.0
86 stars 107 forks source link

kafka transactional support failes after 1 week with gap in messages #542

Open msillence opened 2 years ago

msillence commented 2 years ago

Expected Behavior

Message is sent

Actual Behaviour

sending fails with key error message being: "The producer attempted to use a producer id which is not currently assigned to its transactional id"

Essentially it's this error: https://stackoverflow.com/a/59421077

a streaming application that had no traffic for 7 days, it’s producer metadata was deleted

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
    at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1112) ~[kafka-clients-2.8.0.jar:?]
    at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:356) ~[kafka-clients-2.8.0.jar:?]
    at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:619) ~[kafka-clients-2.8.0.jar:?]
    at com.fnz.kafka.figaro.valuation.sender.HoldingSender.send(HoldingSender.java:47) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.service.HoldService.processPartition(HoldService.java:54) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.service.$HoldService$Definition$Intercepted.$$access$$processPartition(Unknown Source) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.service.$HoldService$Definition$Exec.dispatch(Unknown Source) ~[classes/:?]
    at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378) ~[micronaut-inject-3.4.1.jar:3.4.1]
    at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:128) ~[micronaut-aop-3.4.1.jar:3.4.1]
    at io.micronaut.transaction.interceptor.TransactionalInterceptor.intercept(TransactionalInterceptor.java:196) ~[micronaut-data-tx-3.3.0.jar:3.3.0]
    at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137) ~[micronaut-aop-3.4.1.jar:3.4.1]
    at com.fnz.kafka.figaro.valuation.service.$HoldService$Definition$Intercepted.processPartition(Unknown Source) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.listeners.FigaroHoldListener.lambda$process$0(FigaroHoldListener.java:46) ~[classes/:?]
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[?:?]
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) ~[?:?]
    at com.fnz.kafka.figaro.valuation.listeners.FigaroHoldListener.process(FigaroHoldListener.java:44) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Intercepted.$$access$$process(Unknown Source) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Exec.dispatch(Unknown Source) ~[classes/:?]
    at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378) ~[micronaut-inject-3.4.1.jar:3.4.1]
    at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:128) ~[micronaut-aop-3.4.1.jar:3.4.1]
    at io.micronaut.transaction.interceptor.TransactionalInterceptor.intercept(TransactionalInterceptor.java:196) ~[micronaut-data-tx-3.3.0.jar:3.3.0]
    at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137) ~[micronaut-aop-3.4.1.jar:3.4.1]
    at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Intercepted.process(Unknown Source) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Exec.dispatch(Unknown Source) ~[classes/:?]
    at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378) ~[micronaut-inject-3.4.1.jar:3.4.1]
    at io.micronaut.inject.DelegatingExecutableMethod.invoke(DelegatingExecutableMethod.java:76) ~[micronaut-inject-3.4.1.jar:3.4.1]
    at io.micronaut.core.bind.DefaultExecutableBinder$1.invoke(DefaultExecutableBinder.java:109) ~[micronaut-core-3.4.1.jar:3.4.1]
    at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.processConsumerRecordsAsBatch(KafkaConsumerProcessor.java:604) ~[micronaut-kafka-4.2.0.jar:4.2.0]
    at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.createConsumerThreadPollLoop(KafkaConsumerProcessor.java:462) ~[micronaut-kafka-4.2.0.jar:4.2.0]
    at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$submitConsumerThread$7(KafkaConsumerProcessor.java:421) ~[micronaut-kafka-4.2.0.jar:4.2.0]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:79) ~[micrometer-core-1.8.3.jar:1.8.3]
    at io.micrometer.core.instrument.Timer.lambda$wrap$0(Timer.java:160) ~[micrometer-core-1.8.3.jar:1.8.3]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
","logging.googleapis.com/sourceLocation":{"function":"com.fnz.kafka.figaro.valuation.service.HoldService.processPartition"},"logging.googleapis.com/insertId":"1246","_exception":{"class":"org.apache.kafka.common.KafkaException","message":"Cannot execute transactional method because we are in an error state","stackTrace":"org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
    at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1112) ~[kafka-clients-2.8.0.jar:?]
    at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:356) ~[kafka-clients-2.8.0.jar:?]
    at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:619) ~[kafka-clients-2.8.0.jar:?]
    at com.fnz.kafka.figaro.valuation.sender.HoldingSender.send(HoldingSender.java:47) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.service.HoldService.processPartition(HoldService.java:54) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.service.$HoldService$Definition$Intercepted.$$access$$processPartition(Unknown Source) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.service.$HoldService$Definition$Exec.dispatch(Unknown Source) ~[classes/:?]
    at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378) ~[micronaut-inject-3.4.1.jar:3.4.1]
    at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:128) ~[micronaut-aop-3.4.1.jar:3.4.1]
    at io.micronaut.transaction.interceptor.TransactionalInterceptor.intercept(TransactionalInterceptor.java:196) ~[micronaut-data-tx-3.3.0.jar:3.3.0]
    at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137) ~[micronaut-aop-3.4.1.jar:3.4.1]
    at com.fnz.kafka.figaro.valuation.service.$HoldService$Definition$Intercepted.processPartition(Unknown Source) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.listeners.FigaroHoldListener.lambda$process$0(FigaroHoldListener.java:46) ~[classes/:?]
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[?:?]
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) ~[?:?]
    at com.fnz.kafka.figaro.valuation.listeners.FigaroHoldListener.process(FigaroHoldListener.java:44) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Intercepted.$$access$$process(Unknown Source) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Exec.dispatch(Unknown Source) ~[classes/:?]
    at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378) ~[micronaut-inject-3.4.1.jar:3.4.1]
    at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:128) ~[micronaut-aop-3.4.1.jar:3.4.1]
    at io.micronaut.transaction.interceptor.TransactionalInterceptor.intercept(TransactionalInterceptor.java:196) ~[micronaut-data-tx-3.3.0.jar:3.3.0]
    at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137) ~[micronaut-aop-3.4.1.jar:3.4.1]
    at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Intercepted.process(Unknown Source) ~[classes/:?]
    at com.fnz.kafka.figaro.valuation.listeners.$FigaroHoldListener$Definition$Exec.dispatch(Unknown Source) ~[classes/:?]
    at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:378) ~[micronaut-inject-3.4.1.jar:3.4.1]
    at io.micronaut.inject.DelegatingExecutableMethod.invoke(DelegatingExecutableMethod.java:76) ~[micronaut-inject-3.4.1.jar:3.4.1]
    at io.micronaut.core.bind.DefaultExecutableBinder$1.invoke(DefaultExecutableBinder.java:109) ~[micronaut-core-3.4.1.jar:3.4.1]
    at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.processConsumerRecordsAsBatch(KafkaConsumerProcessor.java:604) ~[micronaut-kafka-4.2.0.jar:4.2.0]
    at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.createConsumerThreadPollLoop(KafkaConsumerProcessor.java:462) ~[micronaut-kafka-4.2.0.jar:4.2.0]
    at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$submitConsumerThread$7(KafkaConsumerProcessor.java:421) ~[micronaut-kafka-4.2.0.jar:4.2.0]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:79) ~[micrometer-core-1.8.3.jar:1.8.3]
    at io.micrometer.core.instrument.Timer.lambda$wrap$0(Timer.java:160) ~[micrometer-core-1.8.3.jar:1.8.3]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]

Steps To Reproduce

add the config to reduce the timeout from the default 1 week to 10 seconds: KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS=10000 or set property transactional.id.expiration.ms=10000

I still find it takes significantly longer than 10 second, I'm testing with 45 minutes.

using the code: https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaClientTx

notably I've got the producer.initTransactions(); in the class initalisation as there's an exception if you call this twice: TransactionalId holding-tx: Invalid transition attempted from state READY to state INITIALIZING

@Singleton
public class HoldingSender {
    @Value("${KAFKA_TOPIC_HOLDING}")
    public String holdingTopic;

    private final Producer<HoldingKey, HoldingValue> holdingProducer;

    public HoldingSender(@KafkaClient(id="holding", transactionalId="holding-sender") Producer<HoldingKey, HoldingValue> kafkaProducer) {
        this.holdingProducer = kafkaProducer;
        holdingProducer.initTransactions();
    }

    public synchronized List<Future<RecordMetadata>> send(List<ProducerRecord<HoldingKey, HoldingValue>> records) {
        try {
            holdingProducer.beginTransaction();
            List<Future<RecordMetadata>> futures = records.stream().map(record -> holdingProducer.send(record)).toList();
            holdingProducer.commitTransaction();
            return futures;

Environment Information

Running with both docker and confluent cloud docker versions:

Example Application

No response

Version

3.4.1

dstepanov commented 2 years ago

Can you please create a sample app with a test describing what is wrong? I don't really understand what is not correct.

msillence commented 2 years ago

Sorry been a bit snowed under there is also info here https://stackoverflow.com/a/52304789 the core concept is to send a message then wait for a week default or 45 mins with the setting above

I did have a test harness I'll see if I can publish it