spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.19k stars 1.56k forks source link

Make `NewTopic` bean can support multiple kafka cluster #2759

Closed isbee closed 1 year ago

isbee commented 1 year ago

Expected Behavior When an spring-kafka application connects multiple kafka cluster, apply NewTopic bean to specific target cluster only.

Current Behavior Currently NewTopic bean applied to every KafkaAdmin, so every kafka cluster will be affected to NewTopic beans on kafkaAdmin.initialize().

Context If an application want to consume event from cluster A, but want to produce to cluster B, this feature will be useful.

Currently I workaround this issue by not using NewTopic 'bean', rather create instances and explicitly call adminClient.createTopics(topics) using target kafka cluster's KafkaAdmin.

garyrussell commented 1 year ago

There is already a new feature (since 3.0.9 and 2.9.10) to filter which topics will be provisioned by each admin.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics

Starting with versions 2.9.10, 3.0.9, you can provide a Predicate<NewTopic> which can be used to determine whether a particular NewTopic bean should be considered for creation or modification. This is useful, for example, if you have multiple KafkaAdmin instances pointing to different clusters and you wish to select those topics that should be created or modified by each admin.

admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));

https://github.com/spring-projects/spring-kafka/pull/2721

https://github.com/spring-projects/spring-kafka/issues/2726

isbee commented 1 year ago

@garyrussell I didn't know this existed because I was using an older version, thanks.

But still I can't understand setCreateOrModifyTopic() because KafkaAdmin's createOrModifyTopic field is managed by Predicate<NewTopic>, not some kind of Map<String, Predicate<NewTopic>> which similar to local variable newTopicsMap in newTopics().

In this case, if I want to filter multiple topics I need to register predicate with multiple and, or which is kind of verbose. I think below will be better?

// (1)
admin.setTargetTopic("a")
admin.setTargetTopic("b")

// or, (2)
admin.setTargetTopics(targetTopicList)
garyrussell commented 1 year ago
Set<NewTopic> topics1 = Set.of(topicA, topicB);

Set<NewTopic> topics2 = Set.of(topicX, topicY);

@Bean
KafkaAdmin admin1() {
    KafkaAdmin admin = new KafkaAdmin(this.cluster1Props);
    admin.setCreateOrModifyTopic(nt -> topics1.contains(nt));
    return admin;
}

@Bean
KafkaAdmin admin2() {
    KafkaAdmin admin = new KafkaAdmin(this.cluster2Props);
    admin.setCreateOrModifyTopic(nt -> topics2.contains(nt));
    return admin;
}
isbee commented 1 year ago

@garyrussell Functional approach with lambda capture might be good, but I just think it is better to pass explicit collection as a parameter though.

Anyway, thanks to your answer the issue has been resolved.

rishiraj88 commented 1 year ago

Quite interesting.

garyrussell commented 1 year ago

...but I just think it is better...

If you prefer to do that, it is easy enough to create a subclass...

class MyKafkaAdmin extends KafkaAdmin {

    private final Set<NewTopic> topics = new HashSet<>();

    public MyKafkaAdmin(Map<String, Object> config) {
        super(config);
        setCreateOrModifyTopic(nt -> this.topics.contains(nt));
    }

    public void setTargetTopics(Set<NewTopic> nts) {
        this.topics.addAll(nts);
    }

}

and

@Bean
KafkaAdmin admin() {
    MyKafkaAdmin admin = new MyKafkaAdmin(this.props);
    admin.setTargetTopics(Set.of(this.topicA, this.topicB));
    return admin;
}