eventuate-tram / eventuate-tram-core

Transactional messaging for microservices
Other
1.05k stars 186 forks source link

Command Dispatcher terminates Consumer in case of wrong Command type #172

Open ilsid opened 3 years ago

ilsid commented 3 years ago

Hi @cer, @dartartem ,

I'm working on PoC using Tram Saga.

tram.core.version: 0.29.0.RELEASE
tram.sagas.version: 0.18.0.RELEASE

I faced with the case when CommandDispatcher terminates consumer if message contains a header with non-expected command_type.

Does any way exist for defining a handler/filter for such "wrong" messages? Please let me know, if more details are needed.

java.lang.RuntimeException: No method for io.eventuate.tram.messaging.common.MessageImpl@413fc4d5[payload={"customerName":"Customer 001"},headers={command_saga_id=0000017d51eda2c6-ee33f02345210000, DATE=Wed, 24 Nov 2021 12:31:08 GMT, command_type=com.eventuate.poc.saga.CreateBillingAccountCommand2, command_reply_to=com.eventuate.poc.saga.RegisterCustomerSaga-reply, DESTINATION=billingService, command_saga_type=com.eventuate.poc.saga.RegisterCustomerSaga, command__destination=billingService, ID=0000017d51eda302-ee33f02345210000}]
    at io.eventuate.tram.commands.consumer.CommandDispatcher.messageHandler(CommandDispatcher.java:58) ~[eventuate-tram-commands-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.sagas.participant.SagaCommandDispatcher.messageHandler(SagaCommandDispatcher.java:42) ~[eventuate-tram-sagas-participant-0.18.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.common.DecoratedMessageHandlerFactory.lambda$decorate$0(DecoratedMessageHandlerFactory.java:33) ~[eventuate-tram-consumer-common-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.common.PrePostHandlerMessageHandlerDecorator.accept(PrePostHandlerMessageHandlerDecorator.java:25) ~[eventuate-tram-consumer-common-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.common.PrePostHandlerMessageHandlerDecorator.accept(PrePostHandlerMessageHandlerDecorator.java:10) ~[eventuate-tram-consumer-common-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.common.MessageHandlerDecoratorChainBuilder.lambda$buildChain$0(MessageHandlerDecoratorChainBuilder.java:38) ~[eventuate-tram-consumer-common-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.common.DuplicateDetectingMessageHandlerDecorator.lambda$accept$0(DuplicateDetectingMessageHandlerDecorator.java:13) ~[eventuate-tram-consumer-common-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.jdbc.SqlTableBasedDuplicateMessageDetector.lambda$doWithMessage$0(SqlTableBasedDuplicateMessageDetector.java:52) ~[eventuate-tram-consumer-jdbc-0.29.0.RELEASE.jar:na]
    at io.eventuate.common.common.spring.jdbc.EventuateSpringTransactionTemplate.lambda$executeInTransaction$0(EventuateSpringTransactionTemplate.java:18) ~[eventuate-common-common-spring-jdbc-0.14.0.RELEASE.jar:na]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.12.jar:5.3.12]
    at io.eventuate.common.common.spring.jdbc.EventuateSpringTransactionTemplate.executeInTransaction(EventuateSpringTransactionTemplate.java:18) ~[eventuate-common-common-spring-jdbc-0.14.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.jdbc.SqlTableBasedDuplicateMessageDetector.doWithMessage(SqlTableBasedDuplicateMessageDetector.java:50) ~[eventuate-tram-consumer-jdbc-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.common.DuplicateDetectingMessageHandlerDecorator.accept(DuplicateDetectingMessageHandlerDecorator.java:13) ~[eventuate-tram-consumer-common-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.common.DuplicateDetectingMessageHandlerDecorator.accept(DuplicateDetectingMessageHandlerDecorator.java:3) ~[eventuate-tram-consumer-common-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.common.MessageHandlerDecoratorChainBuilder.lambda$buildChain$0(MessageHandlerDecoratorChainBuilder.java:38) ~[eventuate-tram-consumer-common-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.common.PrePostReceiveMessageHandlerDecorator.accept(PrePostReceiveMessageHandlerDecorator.java:24) ~[eventuate-tram-consumer-common-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.common.PrePostReceiveMessageHandlerDecorator.accept(PrePostReceiveMessageHandlerDecorator.java:10) ~[eventuate-tram-consumer-common-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.common.MessageHandlerDecoratorChainBuilder.lambda$buildChain$0(MessageHandlerDecoratorChainBuilder.java:38) ~[eventuate-tram-consumer-common-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.common.MessageConsumerImpl.lambda$subscribe$0(MessageConsumerImpl.java:39) ~[eventuate-tram-consumer-common-0.29.0.RELEASE.jar:na]
    at io.eventuate.tram.consumer.kafka.EventuateTramKafkaMessageConsumer.lambda$subscribe$0(EventuateTramKafkaMessageConsumer.java:29) ~[eventuate-tram-consumer-kafka-0.29.0.RELEASE.jar:na]
    at io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl.handle(MessageConsumerKafkaImpl.java:74) ~[eventuate-messaging-kafka-consumer-0.14.0.RELEASE.jar:na]
    at io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl.lambda$null$0(MessageConsumerKafkaImpl.java:44) ~[eventuate-messaging-kafka-consumer-0.14.0.RELEASE.jar:na]
    at io.eventuate.messaging.kafka.consumer.SwimlaneDispatcher.processQueuedMessage(SwimlaneDispatcher.java:72) ~[eventuate-messaging-kafka-consumer-0.14.0.RELEASE.jar:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2021-11-24 14:50:25.726  INFO 1575851 --- [mmandDispatcher] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-billingCommandDispatcher-1, groupId=billingCommandDispatcher] Revoke previously assigned partitions billingService-0
2021-11-24 14:50:25.727  INFO 1575851 --- [mmandDispatcher] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-billingCommandDispatcher-1, groupId=billingCommandDispatcher] Member consumer-billingCommandDispatcher-1-7ee83dd0-dee5-45db-9e5a-fdf70a836418 sending LeaveGroup request to coordinator localhost:9092 (id: 2147482646 rack: null) due to the consumer is being closed
2021-11-24 14:50:25.735  INFO 1575851 --- [mmandDispatcher] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
2021-11-24 14:50:25.735  INFO 1575851 --- [mmandDispatcher] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2021-11-24 14:50:25.736  INFO 1575851 --- [mmandDispatcher] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
2021-11-24 14:50:25.746  INFO 1575851 --- [mmandDispatcher] o.a.kafka.common.utils.AppInfoParser     : App info kafka.consumer for consumer-billingCommandDispatcher-1 unregistered