spring-attic / spring-integration-kafka

Apache License 2.0
324 stars 180 forks source link

A spring boot application server should start up regardless of whether Apache Kafka server is available tor not #65

Closed ghost closed 8 years ago

ghost commented 8 years ago

I have a spring boot application then I need to run regardless of whether Apache Kafka server is available or not. At present what happens If Apache Kafka server is down I am not able to start my spring boot application and I am not able to find out a way by which I can make sure my spring boot application start up without the availability of Apache Kafka server.

Basically I need a way by which I can override KafkaMessageDrivenChannelAdapter.doStart() I am using spring-kafka-integration: spring-integration-java-dsl

garyrussell commented 8 years ago

Set autoStartup to false and start()/stop() it yourself in a try/catch block.

ghost commented 8 years ago

Thanks a ton !!

I am trying to achieve this but it's giving me an error:

@Bean(name = "testKafkaMessageDrivenChannelAdapterListenerContainerSpec")
    @DependsOn({ "connectionFactory", "mediaConsumerOffsetManager" })
    public KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec getKafkaMessageDrivenChannelAdapterListenerContainerSpec() {
        KafkaMessageDrivenChannelAdapterListenerContainerSpec kafkaMessageDrivenChannelAdapterListenerContainerSpec = Kafka.messageDriverChannelAdapter(connectionFactory(), "topic1");

        return kafkaMessageDrivenChannelAdapterListenerContainerSpec;
    }

this configuration is giving me

Exception in thread "main" java.lang.IllegalArgumentException: Cannot instantiate 'IntegrationConfigurationInitializer': org.springframework.integration.dsl.config.DslIntegrationConfigurationInitializer
    at org.springframework.integration.config.IntegrationConfigurationBeanFactoryPostProcessor.postProcessBeanFactory(IntegrationConfigurationBeanFactoryPostProcessor.java:51)
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:265)
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:177)
    at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:606)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:462)
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:118)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:686)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:320)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:957)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:946)
    at com.bcgdv.kaishi.media.Application.main(Application.java:18)
Caused by: org.springframework.beans.factory.BeanCreationException: 'IntegrationComponentSpec' beans: '[testKafkaMessageDrivenChannelAdapterListenerContainerSpec]' must be populated to target objects via 'get()' method call. It is important for @Autowired injections.
    at org.springframework.integration.dsl.config.DslIntegrationConfigurationInitializer.checkSpecBeans(DslIntegrationConfigurationInitializer.java:65)
    at org.springframework.integration.dsl.config.DslIntegrationConfigurationInitializer.initialize(DslIntegrationConfigurationInitializer.java:52)
    at org.springframework.integration.config.IntegrationConfigurationBeanFactoryPostProcessor.postProcessBeanFactory(IntegrationConfigurationBeanFactoryPostProcessor.java:48)
    ... 10 more

Is this not the right way to do it.

garyrussell commented 8 years ago

You need to show your DSL snippet where you are wiring the adapter into your flow.

You shouldn't be creating a ...Spec @Bean yourself.

You should be using IntegrationFlows.from(myKafkaAdapter()) where myKafkaAdapter() is a @Bean of type KafkaMessageDrivenChannelAdapter.

ghost commented 8 years ago

This is the configuration that I am using

@Bean(name = "connectionFactory")
    public ConnectionFactory connectionFactory() {
        ZookeeperConnect zookeeperConnect = new ZookeeperConnect();
        zookeeperConnect.setZkConnect(zookeeperAddress);

        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperConnect);
        return new DefaultConnectionFactory(zookeeperConfiguration);
    }

    @Bean(name = "testConsumerOffsetManager")
    @DependsOn("connectionFactory")
    public OffsetManager offsetManager(ConnectionFactory connectionFactory) {
        MetadataStoreOffsetManager offsetManager = new MetadataStoreOffsetManager(connectionFactory);
        offsetManager.setConsumerId("testConsumerGroupId");
        offsetManager.setReferenceTimestamp(OffsetRequest.LatestTime());
        return offsetManager;
    }

    /*
     * 
     */
    @Bean(name = "testKafkaMessageDrivenChannelAdapter")
    @DependsOn({ "connectionFactory", "testConsumerOffsetManager" })
    public KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec  getKafkaMessageDrivenChannelAdapter() {
        return Kafka.messageDriverChannelAdapter(connectionFactory(), mediaConsumerTopic).autoCommitOffset(false).payloadDecoder(String::new).keyDecoder(b -> Integer.valueOf(new String(b))).configureListenerContainer(c -> c.offsetManager(offsetManager(connectionFactory())).maxFetch(100));
    }

    /*
     * Configuration for fetching the messages from kafka server.
    */
    @Bean(name = "testConsumerIntegrationFlow")
    public IntegrationFlow mediaConsumerIntegrationFlow() {
        return IntegrationFlows.from(getKafkaMessageDrivenChannelAdapter()).channel(c -> c.queue(mediaConsumerListeningFromKafkaResults)).handle(getConsumerService(), consumeMessageMethodName).get();
    } 

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {
        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(10));
        return pollerMetadata;
    }
ghost commented 8 years ago

The configuration works if testKafkaMessageDrivenChannelAdapter is not created as a bean and it is just a part of 'testConsumerIntegrationFlow' bean. However since I want to start the kafka server myself I tried to split that as a separate bean.

artembilan commented 8 years ago

Your getKafkaMessageDrivenChannelAdapter() is really bad.

If you'd like to get access to the bean by its name using IntegrationFlow definition, you should do something like this:

@Bean(name = "testConsumerIntegrationFlow")
    public IntegrationFlow mediaConsumerIntegrationFlow() {
return IntegrationFlows
                    .from(Kafka.messageDriverChannelAdapter(connectionFactory(), mediaConsumerTopic)
                            .id("testKafkaMessageDrivenChannelAdapter")
                            .autoCommitOffset(false)
                            .payloadDecoder(String::new)
                            .keyDecoder(b -> Integer.valueOf(new String(b)))
                            .configureListenerContainer(c -> c.offsetManager(offsetManager(connectionFactory()))
                                    .maxFetch(100)))

Pay attention to the .id() definition and try to follow with the StackTrace recommendation! :smile:

artembilan commented 8 years ago

No problem! Definitely it is worth to explain one more time.

The IntegrationFlow definition (bean) is just a configuration level container which says the Spring Integration infrastructure to pick up components from it and treat them as beans.

So, in your case you should just add the .id() method for the Kafka.messageDriverChannelAdapter() definition, which will become as a KafkaMessageDrivenChannelAdapter bean with that id. Actually it is always as a bean, but by default with generated id.

The next your step is to @Autowired that KafkaMessageDrivenChannelAdapter bean to your custom service and start() it on demand!

No reason to populate one more bean, or try to introduce some more complex logic: just follow with the existing Spring Integration Java DSL procedure: https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference.

Please, read it and let us know how it is clear for you. Any other questions are welcome!

Thank you for the attention to the stuff!

ghost commented 8 years ago

Thanks a ton !!. That was help full :)

ghost commented 8 years ago

Deleted the comment by mistake.

Thanks Artem for providing further information but this is still not working for me.

artembilan commented 8 years ago

Ok, no problem! Please, share the StackTrace then or anything else what's going on

ghost commented 8 years ago

Finally I got through it.

To summarize my initial problem was that when I start my code server and Kafka server is down then my server will not come up.

Solution it to was to set autoStartUp as false 'testMessageDrivenChannelAdapterListenerContainerSpec.autoStartup(false)' and start/stop the server on your own.

Next problem that I faced was how to get bean for 'KafkaMessageDrivenChannelAdapter' so that I can start/stop the server on my own.

This requires me to define the adapter as a bean id and then auto wired it.

Kafka.messageDriverChannelAdapter(connectionFactory(),consumerTopic).id("kafkaMessageDrivenChannelAdapter");

@Autowired protected KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter;

Make sure you autowired it properly, in my case I was having circular dependency issue. So I have to move this dependency to another bean.

Once again thanks Artem and Gary.