Open sunil31925 opened 4 years ago
Hi, thanks for opening the interesting issue!
Kafka Listener never load the Kafka Consumer Config and never trigger the @kafkaListener
because @EnableKafka
and @KafkaListener
are registered differently in Spring
, processed by KafkaListenerAnnotationBeanPostProcessor:
Bean post-processor that registers methods annotated with {@link KafkaListener} to be invoked by a Kafka message listener container created under the covers by a {@link org.springframework.kafka.config.KafkaListenerContainerFactory} according to the parameters of the annotation. Annotated methods can use flexible arguments as defined by {@link KafkaListener}. This post-processor is automatically registered by Spring's {@link EnableKafka} annotation. ...
this repository contains an example in which controllers
created dynamically
, in UserDynamicControllerRegister. registerUserController()
method which annotated with @PostConstruct
, while the ApplicationContext has already been created, and this works because Spring
allows to register registerMapping
when ApplicationContext is already created, using RequestMappingHandlerMapping
to register dynamic config
and@KafkaListener
you can use BeanFactoryPostProcessor to register dynamic config
and@KafkaListener
before KafkaListenerAnnotationBeanPostProcessor
invocation, simple example:
MyBeanFactoryPostProcessor
implementation with dynamic config
and@KafkaListener
:
package com.example;
import com.example.MyPayload;
import java.lang.reflect.Modifier;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.asm.MemberAttributeExtension;
import net.bytebuddy.description.annotation.AnnotationDescription;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.Argument;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.context.annotation.Configuration;
import static net.bytebuddy.matcher.ElementMatchers.named;
@Component
public class MyBeanFactoryPostProcessor implements BeanFactoryPostProcessor {
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
try {
// registers dynamicKafkaListenerConfig bean
Class<?> dynamicKafkaListenerConfig = generateDynamicKafkaListenerConfig();
AbstractBeanDefinition dynamicKafkaListenerConfigBeanDefinition = BeanDefinitionBuilder
.rootBeanDefinition(dynamicKafkaListenerConfig)
.getBeanDefinition();
((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(
"dynamicKafkaListenerConfig", dynamicKafkaListenerConfigBeanDefinition);
// registers dynamicKafkaListener bean
Class<?> dynamicKafkaListener = generateDynamicKafkaListener();
AbstractBeanDefinition dynamicKafkaListenerBeanDefinition = BeanDefinitionBuilder
.rootBeanDefinition(dynamicKafkaListener)
.getBeanDefinition();
((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(
"dynamicKafkaListener", dynamicKafkaListenerBeanDefinition);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// generates dynamicKafkaListenerConfig class
private Class<?> generateDynamicKafkaListenerConfig() throws Exception {
return new ByteBuddy()
.subclass(Object.class)
.name("DynamicKafkaListenerConfig")
.annotateType(AnnotationDescription.Builder.ofType(Configuration.class)
.build(), AnnotationDescription.Builder.ofType(EnableKafka.class)
.build())
.make()
.load(getClass().getClassLoader())
.getLoaded();
}
// generates dynamicKafkaListener class
private Class<?> generateDynamicKafkaListener() throws Exception {
return new ByteBuddy()
.subclass(Object.class)
.name("DynamicKafkaListener")
.annotateType(AnnotationDescription.Builder.ofType(Component.class)
.build())
.defineMethod("consume", void.class, Modifier.PUBLIC)
.withParameter(MyPayload.class, "myPayload")
.annotateParameter(AnnotationDescription.Builder.ofType(Payload.class)
.build())
.intercept(MethodDelegation.to(DynamicKafkaConsumerIntercept.class))
.visit(new MemberAttributeExtension.ForMethod()
.annotateMethod(AnnotationDescription.Builder.ofType(KafkaListener.class)
.defineArray("topics", new String[]{"my-topic"})
.define("groupId", "my-groupId")
.build())
.on(named("consume")))
.make()
.load(getClass().getClassLoader())
.getLoaded();
}
// simple interceptor for DynamicKafkaConsumer
public static class DynamicKafkaConsumerIntercept {
public static void consume(@Argument(0) MyPayload myPayload) {
System.out.println("Received myPayload: " + myPayload);
}
}
}
generated DynamicKafkaListenerConfig.class
result:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
@Configuration("")
@EnableKafka
public class DynamicKafkaListenerConfig {
public DynamicKafkaListenerConfig() {
}
}
generated DynamicKafkaListener.class
result:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
import com.example.MyPayload;
import io.tpd.kafkaexample.MyBeanFactoryAware.DynamicKafkaConsumerIntercept;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component("")
public class DynamicKafkaListener {
@KafkaListener(
concurrency = "",
topics = {"my-topic"},
topicPartitions = {},
errorHandler = "",
autoStartup = "",
topicPattern = "",
id = "",
groupId = "my-groupId",
beanRef = "__listener",
clientIdPrefix = "",
containerGroup = "",
containerFactory = "",
idIsGroup = true
)
public void consume(@Payload(required = true,expression = "",value = "") MyPayload myPayload) {
DynamicKafkaConsumerIntercept.consume(var1);
}
public DynamicKafkaListener() {
}
}
in this case, dynamic config
and@KafkaListener
will work as it will be processed by KafkaListenerAnnotationBeanPostProcessor
does this solution solve your issue?
MyPayload what does this class contains?
MyPayload
is simple POJO, based on PracticalAdvice, tested with @Payload PracticalAdvice payload - I removed all listeners and generated the first one with the MyBeanFactoryPostProcessor
.
P.S. I've updated the previous answer - added @Configuration
to the DynamicKafkaListenerConfig.class
Kindly share MyPayload pojo class if possible, This is great work for me now.
Thank you
MyPayload
class:
import com.fasterxml.jackson.annotation.JsonProperty;
public class MyPayload {
private final String message;
private final int identifier;
public MyPayload(@JsonProperty("message") final String message,
@JsonProperty("identifier") final int identifier) {
this.message = message;
this.identifier = identifier;
}
public String getMessage() {
return message;
}
public int getIdentifier() {
return identifier;
}
@Override
public String toString() {
return "MyPayload{" +
"message='" + message + '\'' +
", identifier=" + identifier +
'}';
}
}
you are welcome
This will not work our message of Type org.springframework.messagsing.Message @ payload Message
I generated @KafkaListener
method with the following parameter and annotation:
public void consume(@Payload MyPayload myPayload){
...
}
in your case you have to generate method with Message parameter without @Payload
annotation:
public void consume(Message<?> message){
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
...
}
since payload is already in the Message
, this should work
does it work in your case?
How to deal with Database call FROM BeanFactoryPostProcessor . I want to read metadata from database and create many consumer base on desfine in database
if you are using SQL
database then you can just use JdbcTemplate, and configure manually (postgresql
example with org.postgresql:postgresql:42.2.16
dependency):
DataSourceBuilder dataSourceBuilder = DataSourceBuilder.create();
dataSourceBuilder.driverClassName("org.postgresql.Driver");
dataSourceBuilder.url("jdbc:postgresql://127.0.0.1:5432/myDb");
dataSourceBuilder.username("myUser");
dataSourceBuilder.password("MyPassword");
DataSource dataSource = dataSourceBuilder.build();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List<Map<String, Object>> maps = jdbcTemplate.queryForList("select * from my_table");
_... do something for your case..._
to get property values for DataSourceBuilder
you can use Environment
:
Environment environment = beanFactory.getBean(Environment.class);
String url = environment.getProperty("db-url");
String username = environment.getProperty("db-username");
String password = environment.getProperty("db-password");
...