eventuate-tram / eventuate-tram-core

Transactional messaging for microservices
Other
1.04k stars 185 forks source link

UnexpectedRollbackException while consuming messages #118

Closed nrsina closed 4 years ago

nrsina commented 4 years ago

Hello My app received an exception while processing messages received from another microservice. There was around 30 messages in Kafka topic and the app wasn't consuming due to an exception which caused the consumer to stop. After restart, it had processed around 20 messages before the exception:

2020-08-18 10:36:44.821 ERROR [collateral,,,] 1 --- [pool-4-thread-1] i.e.m.kafka.consumer.SwimlaneDispatcher : Exception handling message - terminating org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:752) at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711) at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:633) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:386) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:91) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:118) at org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:153) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691) at io.eventuate.tram.spring.optimisticlocking.OptimisticLockingDecorator$$EnhancerBySpringCGLIB$$be630e66.accept() at io.eventuate.tram.consumer.common.MessageHandlerDecoratorChainBuilder.lambda$buildChain$0(MessageHandlerDecoratorChainBuilder.java:38) at io.eventuate.tram.consumer.common.PrePostReceiveMessageHandlerDecorator.accept(PrePostReceiveMessageHandlerDecorator.java:24) at io.eventuate.tram.consumer.common.PrePostReceiveMessageHandlerDecorator.accept(PrePostReceiveMessageHandlerDecorator.java:10) at io.eventuate.tram.consumer.common.MessageHandlerDecoratorChainBuilder.lambda$buildChain$0(MessageHandlerDecoratorChainBuilder.java:38) at io.eventuate.tram.consumer.common.MessageConsumerImpl.lambda$subscribe$0(MessageConsumerImpl.java:39) at io.eventuate.tram.consumer.kafka.EventuateTramKafkaMessageConsumer.lambda$subscribe$0(EventuateTramKafkaMessageConsumer.java:29) at io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl.handle(MessageConsumerKafkaImpl.java:74) at io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl.lambda$null$0(MessageConsumerKafkaImpl.java:44) at io.eventuate.messaging.kafka.consumer.SwimlaneDispatcher.processQueuedMessage(SwimlaneDispatcher.java:72) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2020-08-18 10:36:44.825 TRACE [collateral,,,] 1 --- [iber-Collateral] i.e.m.k.b.c.EventuateKafkaConsumer : Terminating since KafkaMessageProcessorFailedException 2020-08-18 10:36:44.849 TRACE [collateral,,,] 1 --- [iber-Collateral] i.e.m.k.b.c.EventuateKafkaConsumer : Stopped in state MESSAGE_HANDLING_FAILED

I have also imported OptimisticLockingDecoratorConfiguration to my Configuration bean. My database is Oracle (I've added Oracle support to Eventuate CDC. it needed many code changes) Any help would be appreciated. This is a very critical app and it should not lose even one message. I think it's better to make "stopping consumer after an exception" configurable.

Best Regards, Sina

cer commented 4 years ago

Eventuate Tram will stop processing messages if a message handler throws an exception - no messages will be lost. The assumption is that an exception indicates a bug (or some other inability to process messages) that must be fixed before message processing can continue.

Also, code invoked by message handlers shouldn't use Spring's @Transactional since that interferes with Eventuate Tram's transaction management. The stack trace suggests that @Transactional is being used.

I think it's better to make "stopping consumer after an exception" configurable. << what behavior do you want?

cer commented 4 years ago

I'd like to know more about the CDC code changes required to support Oracle. If you are just supporting polling I would have expected that you just needed to define a EventuateSqlDialect. I created this issue: https://github.com/eventuate-foundation/eventuate-cdc/issues/64

nrsina commented 4 years ago

I'd like to know more about the CDC code changes required to support Oracle. If you are just supporting polling I would have expected that you just needed to define a EventuateSqlDialect. I created this issue: eventuate-foundation/eventuate-cdc#64

Thanks for your detailed answer.

Yes I was using @Transactional . So that is the problem. thank you 👍

Sure. I don't have access to the code right now but I will write the changes I needed to make to CDC in that issue tomorrow. As I recall, creating a new EventuateSqlDialect was not enough for Oracle (Polling). I've also made some small hacks to make it work but hopefully I can make it better with your help.

Kind Regards

isfong commented 3 years ago

Eventuate Tram will stop processing messages if a message handler throws an exception - no messages will be lost. The assumption is that an exception indicates a bug (or some other inability to process messages) that must be fixed before message processing can continue.

Also, code invoked by message handlers shouldn't use Spring's @Transactional since that interferes with Eventuate Tram's transaction management. The stack trace suggests that @Transactional is being used.

I think it's better to make "stopping consumer after an exception" configurable. << what behavior do you want?

hello~! Does that mean that a method is proprietary to the message handler and that callers from other aspects who have the same business intent will not be able to share the methods, Because the transaction is not open

cer commented 3 years ago

You would need two methods:

  1. A method that implements the actual business logic and does not use @Transactional, e.g. bizLogicNotTransactional() - it's called by the message handler.
  2. A method that is @Transactional, bizLogicTransactional(), which calls bizLogicNotTransactional()