eventuate-tram / eventuate-tram-sagas-quarkus

Other
5 stars 4 forks source link

Build 16 failed - message handlers invoked before transaction that sends message commits #6

Open dartartem opened 3 years ago

dartartem commented 3 years ago

See: https://app.circleci.com/pipelines/github/eventuate-tram/eventuate-tram-sagas-quarkus/6/workflows/0a2077f3-c149-43e8-9db2-13f70fcb3264/jobs/16

shouldRejectOrder() - io.eventuate.examples.tram.sagas.ordersandcustomers.integrationtests.quarkus.OrdersAndCustomersInMemoryIntegrationTest

failed with


Caused by: java.lang.AssertionError: expected:<REJECTED> but was:<PENDING>
    at org.junit.Assert.fail(Assert.java:88)
    at org.junit.Assert.failNotEquals(Assert.java:834)
    at org.junit.Assert.assertEquals(Assert.java:118)
    at org.junit.Assert.assertEquals(Assert.java:144)
    at 

In the logs:

  <system-err><![CDATA[Exception in thread "pool-7-thread-1" java.lang.RuntimeException: Cannot find saga instance io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.createorder.CreateOrderSaga 00000177b5ff81d3-0242d574ae5a0000
    at io.eventuate.tram.sagas.orchestration.SagaInstanceRepositoryJdbc.lambda$find$2(SagaInstanceRepositoryJdbc.java:119)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at io.eventuate.tram.sagas.orchestration.SagaInstanceRepositoryJdbc.find(SagaInstanceRepositoryJdbc.java:119)
    at io.eventuate.tram.sagas.orchestration.SagaManagerImpl.handleReply(SagaManagerImpl.java:172)
    at io.eventuate.tram.sagas.orchestration.SagaManagerImpl.handleMessage(SagaManagerImpl.java:155)
    at io.eventuate.tram.consumer.common.DecoratedMessageHandlerFactory.lambda$decorate$0(DecoratedMessageHandlerFactory.java:33)
    at io.eventuate.tram.consumer.common.PrePostHandlerMessageHandlerDecorator.accept(PrePostHandlerMessageHandlerDecorator.java:25)
    at io.eventuate.tram.consumer.common.PrePostHandlerMessageHandlerDecorator.accept(PrePostHandlerMessageHandlerDecorator.java:10)
    at io.eventuate.tram.consumer.common.MessageHandlerDecoratorChainBuilder.lambda$buildChain$0(MessageHandlerDecoratorChainBuilder.java:38)
    at io.eventuate.tram.consumer.common.DuplicateDetectingMessageHandlerDecorator.lambda$accept$0(DuplicateDetectingMessageHandlerDecorator.java:13)
    at io.eventuate.tram.consumer.jdbc.SqlTableBasedDuplicateMessageDetector.lambda$doWithMessage$0(SqlTableBasedDuplicateMessageDetector.java:52)
    at io.eventuate.common.quarkus.jdbc.EventuateQuarkusTransactionTemplate.executeInTransaction(EventuateQuarkusTransactionTemplate.java:12)
    at io.eventuate.common.quarkus.jdbc.EventuateQuarkusTransactionTemplate_Subclass.executeInTransaction$$superaccessor1(EventuateQuarkusTransactionTemplate_Subclass.zig:210)
    at io.eventuate.common.quarkus.jdbc.EventuateQuarkusTransactionTemplate_Subclass$$function$$1.apply(EventuateQuarkusTransactionTemplate_Subclass$$function$$1.zig:33)
    at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
    at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.invokeInOurTx(TransactionalInterceptorBase.java:127)
    at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.invokeInOurTx(TransactionalInterceptorBase.java:100)
    at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.doIntercept(TransactionalInterceptorRequired.java:32)
    at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.intercept(TransactionalInterceptorBase.java:53)
    at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.intercept(TransactionalInterceptorRequired.java:26)
    at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired_Bean.intercept(TransactionalInterceptorRequired_Bean.zig:340)
    at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
    at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
    at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
    at io.eventuate.common.quarkus.jdbc.EventuateQuarkusTransactionTemplate_Subclass.executeInTransaction(EventuateQuarkusTransactionTemplate_Subclass.zig:168)
    at io.eventuate.tram.consumer.jdbc.SqlTableBasedDuplicateMessageDetector.doWithMessage(SqlTableBasedDuplicateMessageDetector.java:50)
    at io.eventuate.tram.consumer.common.DuplicateDetectingMessageHandlerDecorator.accept(DuplicateDetectingMessageHandlerDecorator.java:13)
    at io.eventuate.tram.consumer.common.DuplicateDetectingMessageHandlerDecorator.accept(DuplicateDetectingMessageHandlerDecorator.java:3)
    at io.eventuate.tram.consumer.common.MessageHandlerDecoratorChainBuilder.lambda$buildChain$0(MessageHandlerDecoratorChainBuilder.java:38)
    at io.eventuate.tram.consumer.common.PrePostReceiveMessageHandlerDecorator.accept(PrePostReceiveMessageHandlerDecorator.java:24)
    at io.eventuate.tram.consumer.common.PrePostReceiveMessageHandlerDecorator.accept(PrePostReceiveMessageHandlerDecorator.java:10)
    at io.eventuate.tram.consumer.common.MessageHandlerDecoratorChainBuilder.lambda$buildChain$0(MessageHandlerDecoratorChainBuilder.java:38)
    at io.eventuate.tram.consumer.common.MessageConsumerImpl.lambda$subscribe$0(MessageConsumerImpl.java:39)
    at io.eventuate.tram.inmemory.InMemoryMessageConsumer.lambda$dispatchMessageToHandlers$0(InMemoryMessageConsumer.java:44)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Locally is not reproducible.

Problem:

I missed that InMemoryMessageProducer uses spring PlatformTransactionManager:

https://github.com/eventuate-tram/eventuate-tram-core/blob/wip-db-id-gen/eventuate-tram-in-memory/src/main/java/io/eventuate/tram/inmemory/InMemoryMessageProducer.java#L26

and it does not work with quarkus.

Instead of executing callback of withContext method after transaction, it does it immediately.

withContext is used by https://github.com/eventuate-tram/eventuate-tram-core/blob/wip-db-id-gen/eventuate-tram-messaging-producer-common/src/main/java/io/eventuate/tram/messaging/producer/common/MessageProducerImpl.java#L36 for senging messages

in case of InMemoryMessageProducer it means executing in-memory consumers in separate thread:

https://github.com/eventuate-tram/eventuate-tram-core/blob/wip-db-id-gen/eventuate-tram-in-memory/src/main/java/io/eventuate/tram/inmemory/InMemoryMessageConsumer.java#L44

How it is related to saga:

Order created inside a transaction:

https://github.com/eventuate-tram/eventuate-tram-sagas/blob/wip-db-id-gen/orders-and-customers/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/orders/service/OrderService.java#L41

Saga is saved:

https://github.com/eventuate-tram/eventuate-tram-sagas/blob/wip-db-id-gen/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java#L87

Then, message is dispatching here:

https://github.com/eventuate-tram/eventuate-tram-sagas/blob/wip-db-id-gen/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java#L105

https://github.com/eventuate-tram/eventuate-tram-sagas/blob/wip-db-id-gen/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java#L116

Instead of dispatching after transaction, message is dispatched during not finished transaction. So, saved saga can be not visible for other threads, if they accessed to db before transaction is finished:

https://github.com/eventuate-tram/eventuate-tram-sagas/blob/wip-db-id-gen/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java#L143

https://github.com/eventuate-tram/eventuate-tram-sagas/blob/wip-db-id-gen/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java#L155

https://github.com/eventuate-tram/eventuate-tram-sagas/blob/wip-db-id-gen/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java#L172

To reproduce it locally, I added Thread.sleep here:

https://github.com/eventuate-tram/eventuate-tram-sagas/blob/wip-db-id-gen/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java#L106

dartartem commented 3 years ago

To solve it, for quarkus I propose to use TransactionManager: https://quarkus.io/guides/transaction

  @Inject
  TransactionManager transactionManager;
  transactionManager.getTransaction().registerSynchronization(new Synchronization() {
    @Override
    public void beforeCompletion() {

    }

    @Override
    public void afterCompletion(int status) {

    }
  });

In case if there is no transaction transactionManager.getTransaction() returns null (tested)