reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
613 stars 227 forks source link

Kafka producer instance throws exception when multiple concurrent calls are made transactionally. #248

Open burimkrasniqi opened 2 years ago

burimkrasniqi commented 2 years ago

The same producer kafka instance can not be used at the same time for more than one request. The reason in the producer options we specify the transactionId - ProducerConfig.TRANSACTIONAL_ID_CONFIG

protected Map<String, Object> producerOptions(boolean transactional) { val props = new HashMap<String, Object>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.ACKS_CONFIG, "all"); if (transactional) { props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transaction-id"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); } props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); if (tClass.equals(String.class)) // props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); else // props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

return props;

}

A work around would be like this: on each request create a new kafka producer with a different transactionId. I tried this work around it does not work properly beside the inefficiency. The problem is that each time we use a new kafka producer once we finish with it we need to close it otherwise it causes memory leak. The problem here is that the function to close a producer like producer.close() is blocking and it makes it impossible to use this as a work around.

The best option would be to not specify this transactionId by us but to be generated by library internally and being able to use the same producer multiple time by multiple requests in parallel.

The error is like this:

org.apache.kafka.common.KafkaException: TransactionalId my-transaction-id: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1078) ~[kafka-clients-2.7.1.jar:na] at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1071) ~[kafka-clients-2.7.1.jar:na] at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:357) ~[kafka-clients-2.7.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:620) ~[kafka-clients-2.7.1.jar:na] at reactor.kafka.sender.internals.DefaultTransactionManager.lambda$null$0(DefaultTransactionManager.java:43) ~[reactor-kafka-1.3.5.jar:1.3.5] at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73) ~[reactor-core-3.4.9.jar:3.4.9] at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32) ~[reactor-core-3.4.9.jar:3.4.9] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:139) ~[reactor-core-3.4.9.jar:3.4.9] at reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:181) ~[reactor-core-3.4.9.jar:3.4.9] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.9.jar:3.4.9] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.9.jar:3.4.9] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na] at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

garyrussell commented 2 years ago

In spring-kafka, we cache transactional producers.

There is currently no caching producer factory in reactor-kafka.

You could maintain your own pool of senders with different transactional.ids.

kunallawand5007 commented 2 years ago

is this issue resolved?

garyrussell commented 2 years ago

It is not; you need to maintain a pool of producers in your code. Only one transaction can be in a process at a time.

kunallawand5007 commented 2 years ago

Hi Gary ,

Thanks for Help !!

As per your suggestion I have maintain Pool of Kafka Sender .It works well in functional testing but when it comes to load test ,It's started failing with below exception :

org.apache.kafka.common.KafkException: Cannot execute transactional method because we are in error state

Code snippet:

image image image

Due to some security constraints not able to attach code that's why added code snippet , Can you please check what would be the possible root cause for this exception.

Thanks, Kunal Lawand

garyrussell commented 2 years ago

If you get any exceptions (e.g. on beginTransaction(), commitTransaction() or abortTransaction() you must close the producer and remove it from the pool.

qavid commented 2 years ago

In spring-kafka, we cache transactional producers.

There is currently no caching producer factory in reactor-kafka.

You could maintain your own pool of senders with different transactional.ids.

Hi @garyrussell is there any plan to support caching producer factory? Thanks.

garyrussell commented 2 years ago

No immediate plans, but contributions are always welcome.

Manju050 commented 4 months ago

Hi @garyrussell Does Kafka support multiple on-going transactions at a time? I am expecting that if kafka supports multiple on-going transactions concurrently then consumer should receive messages based on commit order.Is this actually happens?

garyrussell commented 4 months ago

I am no longer involved with this project.

No; the KafkaProducer does not support multiple concurrent transactions; you need to maintain a pool.

https://github.com/reactor/reactor-kafka/issues/248#issuecomment-941181586