microservices-patterns / ftgo-application

Example code for the book Microservice patterns
Other
3.35k stars 1.29k forks source link

How to define a KafkaConsumerFactory for Eventuate Tram saga #125

Closed channaitfac closed 3 years ago

channaitfac commented 3 years ago

I am trying to deploy the Eventuate Tram saga example GIT application only with OrderService and ConsumerService.

While the application deploying I am getting an exception and an error as:

Exception:

2020-09-30 16:21:50.303  INFO 13612 --- [  restartedMain] i.e.t.e.s.DomainEventDispatcher          : Initialized domain event dispatcher
2020-09-30 16:21:50.415 ERROR 13612 --- [erServiceEvents] i.e.m.k.b.c.EventuateKafkaConsumer       : Got exception:

java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1208) ~[kafka-clients-2.3.0.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.0.jar:na]
        at io.eventuate.messaging.kafka.basic.consumer.DefaultKafkaMessageConsumer.poll(DefaultKafkaMessageConsumer.java:61) ~[eventuate-messaging-kafka-basic-consumer-0.10.0.RELEA
SE.jar:na]
        at io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumer.runPollingLoop(EventuateKafkaConsumer.java:148) ~[eventuate-messaging-kafka-basic-consumer-0.10.0.RELE
ASE.jar:na]
        at io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumer.lambda$start$0(EventuateKafkaConsumer.java:106) ~[eventuate-messaging-kafka-basic-consumer-0.10.0.RELE
ASE.jar:na]
        at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_112]

2020-09-30 16:21:50.499  WARN 13612 --- [  restartedMain] c.n.c.sources.URLConfigurationSource     : No URLs will be polled as dynamic configuration sources.
Exception in thread "Eventuate-subscriber-orderServiceEvents" 2020-09-30 16:21:50.500  INFO 13612 --- [  restartedMain] c.n.c.sources.URLConfigurationSource     : To enable URLs as
 dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
java.lang.RuntimeException: java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
        at io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumer.lambda$start$0(EventuateKafkaConsumer.java:125)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1208)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
        at io.eventuate.messaging.kafka.basic.consumer.DefaultKafkaMessageConsumer.poll(DefaultKafkaMessageConsumer.java:61)
        at io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumer.runPollingLoop(EventuateKafkaConsumer.java:148)
        at io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumer.lambda$start$0(EventuateKafkaConsumer.java:106)
        ... 1 more
2020-09-30 16:21:50.947  INFO 13612 --- [er-orderService] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=orderService] Discovered group coordinat
or ChannaX240.mshome.net:9092 (id: 2147483647 rack: null)
2020-09-30 16:21:50.957  INFO 13612 --- [er-orderService] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=orderService] Revoking previously assign
ed partitions []

Error:

***************************
APPLICATION FAILED TO START
***************************
Description:
Parameter 2 of method messageConsumerKafka in io.eventuate.messaging.kafka.spring.consumer.MessageConsumerKafkaConfiguration required a bean of type 'io.eventuate.messaging.kafka.b
asic.consumer.KafkaConsumerFactory' that could not be found.

Action: 
Consider defining a bean of type 'io.eventuate.messaging.kafka.basic.consumer.KafkaConsumerFactory' in your configuration.

My OrderServiceConfiguration configuration class as follows:

@Configuration
@Import({ TramEventsPublisherConfiguration.class, SagaOrchestratorConfiguration.class, CommonConfiguration.class })
public class OrderServiceConfiguration {

    @Bean
    public OrderService orderService(
            SagaInstanceFactory sagaInstanceFactory,
            OrderRepository orderRepository,
            OrderDomainEventPublisher orderAggregateEventPublisher,
            CreateOrderSaga createOrderSaga) {
        return new OrderService(sagaInstanceFactory, orderRepository, orderAggregateEventPublisher, createOrderSaga);
    }

    @Bean
    public CreateOrderSaga createOrderSaga(
            OrderServiceProxy orderService, 
            ConsumerServiceProxy consumerService,
            KitchenServiceProxy kitchenServiceProxy, 
            AccountingServiceProxy accountingService){
        return new CreateOrderSaga(orderService, consumerService, kitchenServiceProxy, accountingService);
    }

    @Bean
    public OrderServiceProxy orderServiceProxy() {
        return new OrderServiceProxy();
    }

    @Bean
    public ConsumerServiceProxy consumerServiceProxy() {
        return new ConsumerServiceProxy();
    }

    @Bean
    public KitchenServiceProxy kitchenServiceProxy() {
        return new KitchenServiceProxy();
    }

    @Bean
    public AccountingServiceProxy accountingServiceProxy() {
        return new AccountingServiceProxy();
    }

    @Bean
    public OrderDomainEventPublisher orderAggregateEventPublisher(DomainEventPublisher eventPublisher) {
        return new OrderDomainEventPublisher(eventPublisher);
    }

    @Bean
    public MeterRegistryCustomizer meterRegistryCustomizer(@Value("${spring.application.name}") String serviceName) {
        return registry -> registry.config().commonTags("service", serviceName);
    }
}

But only error will gone (but exception is still there) when I add following code into the OrderServiceConfiguration configuration:

@Bean
public KafkaConsumerFactory kafkaConsumerFactory() {
  return new DefaultKafkaConsumerFactory();
}

With this fix, (even though exception is still there) now two micro services (OrderService and ConsumerService) have deployed successfully and I can create Orders with OrderService and get Consumers with ConsumerService via REST API calls.

But issue is once I create a Order via REST API call, then OrderService not talking to ConsumerService (OrderService not send related Kafka message to ConsumerService via consumerService topic)

That means OrderService has failed to complete the CreateOrderSaga related first step

.step().invokeParticipant(consumerServiceProxy.validateOrder, CreateOrderSagaState::makeValidateOrderByConsumerCommand)

But when I debug, I have seen the makeValidateOrderByConsumerCommand method execute correctly and create and return a ValidateOrderByConsumer object successfully.

ValidateOrderByConsumer makeValidateOrderByConsumerCommand() {
  ValidateOrderByConsumer x = new ValidateOrderByConsumer();
  x.setConsumerId(getOrderDetails().getConsumerId());
  x.setOrderId(getOrderId());
  x.setOrderTotal(getOrderDetails().getOrderTotal());
  return x;
} 

Also those created messages have been inserted to the eventuate database related message table as: eventuate_message_table

Please help me to fix this Kafka topic (consumerService) related issue between OrderService and ConsumerService

channaitfac commented 3 years ago

This was happens due to I have added some unwonted (extra) "io.eventuate.tram" dependencies into the POM file.