spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.19k stars 1.56k forks source link

kafkaAdmin auto configuration when creating KafkaTemplate #2780

Open ChangguHan opened 1 year ago

ChangguHan commented 1 year ago

Expected Behavior When using multiple kafkaTemplates with observations, the kafkaAdmin made automatically can connect to kafka with SSL.

Current Behavior When using multiple kafkaTemplates with observations, the properties for sasl, security, ssl is not applied to the kafkaAdmin. It could be the problem when I set bootstrap.servers with the port for ssl.

        // KafkaTemplate.java
    @Override
    public void afterSingletonsInstantiated() {
        if (this.observationEnabled && this.applicationContext != null) {
            this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class)
                    .getIfUnique(() -> this.observationRegistry);
            if (this.kafkaAdmin == null) {
                this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
                if (this.kafkaAdmin != null) {
                    Object producerServers = this.producerFactory.getConfigurationProperties()
                            .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
                    String adminServers = this.kafkaAdmin.getBootstrapServers();
                    if (!producerServers.equals(adminServers)) {
                        Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());

                                                  // It only set bootstrap.server
                        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers);
                        int opTo = this.kafkaAdmin.getOperationTimeout();
                        this.kafkaAdmin = new KafkaAdmin(props);
                        this.kafkaAdmin.setOperationTimeout(opTo);
                    }
                }
            }
        }
        else if (this.micrometerEnabled) {
            this.micrometerHolder = obtainMicrometerHolder();
        }
    }

Context

I would like to suggest two things.

  1. When creating kafkaAdmin at 489 lines, add props for SASL, security, and SSL.
  2. Instead of getting a unique kafkaAdmin bean at 480, guide users to create a kafkaTemplate with kafkaAdmin or use setter. Then we could simplify the kafkaAdmin creation logic by making a new kafkaAdmin with producerFactory's config when kakfAdmin is null.

The sample code would be like this.

    @Override
    public void afterSingletonsInstantiated() {
        if (this.observationEnabled && this.applicationContext != null) {
            this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class)
                    .getIfUnique(() -> this.observationRegistry);
            if (this.kafkaAdmin == null) {
                this.kafkaAdmin = new KafkaAdmin(this.producerFactory.getConfigurationProperties())
            }
        }
        else if (this.micrometerEnabled) {
            this.micrometerHolder = obtainMicrometerHolder();
        }
    }
garyrussell commented 1 year ago

@ChangguHan This is a reasonable request, but the work around is to set an admin instance yourself...

https://github.com/spring-projects/spring-kafka/blob/156d09662a7acda86d89fe4250abbf15a13d0179/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java#L465-L472

ChangguHan commented 1 year ago

@garyrussell Thank you for your comment.

I understood your point. But It needed a little supplement about SSL if you intended to create a new instance when bootstrap servers from the origin bean and producerFactory are different. https://github.com/spring-projects/spring-kafka/blob/701ed82e6493f813a8e30aa0d29cd116a7fe5c73/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java#L485-L491

Wzy19930507 commented 1 year ago

Hi, @garyrussell, may i pick it up?

I have some compatibility questions, please give me a hint. Because of org.apache.kafka.common.Config.SslConfigs version in iterative process, add some properties, abandoned some of the properties.

Plan A Refer KafkaProperties https://github.com/spring-projects/spring-boot/blob/8f2ec227389391fdd173db0ab64f26abd2752f20/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java#L251-L253

Plan B

// generating a clientId is different from common admin clinet
this.producerFactory.getConfigurationProperties().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
this.producerFactory.getConfigurationProperties().forEach((key, value) -> {
    if (key.startsWith("ssl.")) {
        props.put(key, value);
    }
});

look like Plan B is more flexible, but it feels a little weird

PPrydorozhnyi commented 11 months ago

I think I have an issue kinda related to this request. When I want to publish events in parallel and Kafka Admin is not initialized yet, I see several Kafka admin creations/initializations in the logs, and my publishing just stops. As a result, no events are published, and the producing process gets stuck.

I mean something like this: `val executor = Executors.newVirtualThreadPerTaskExecutor() val n = 1000

    for (i in 1..n) {
        CompletableFuture.runAsync(
            { template.send(inTopic, i.toString(), "value") },
            executor);
    }`

creation of Kafka Admin beforehand and setting it for the template solves the issue. but I wondering if this is the correct behavior

artembilan commented 11 months ago

@PPrydorozhnyi ,

that is not possible if your template is a singleton bean in the application context. The logic there is like this:

    public void afterSingletonsInstantiated() {
        if (this.observationEnabled && this.applicationContext != null) {
            this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class)
                    .getIfUnique(() -> this.observationRegistry);
            if (this.kafkaAdmin == null) {
                this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();

And this afterSingletonsInstantiated() is called only once when application context is ready.

PPrydorozhnyi commented 11 months ago

Hi @artembilan. Thanks for the quick response.

Unfortunately, I'm able to reproduce it without any custom bean scopes or manual bean creation.

Created a small project, so you could try it by yourself - Reproduce example

Could you please check? Thanks in advance.

artembilan commented 11 months ago
o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:

But that's correct, because AdminClient is not a KafkaAdmin. We talk about different object. Yes, KafkaAdmin uses AdminClient and does that this way:

            try (AdminClient client = createAdmin()) {
                this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS);
                if (this.clusterId == null) {
                    this.clusterId = "null";
                }
            }

So, it is not a surprise to see several instances of the AdminClient created on the fly.

Not sure, though why there has to be many of them since KafkaTemplate logic is like this:

    private String clusterId() {
        if (this.kafkaAdmin != null && this.clusterId == null) {
            this.clusterId = this.kafkaAdmin.clusterId();
        }
        return this.clusterId;
    }

The clusterId is resolved only once.

I'll run your application after lunch.

artembilan commented 11 months ago

@PPrydorozhnyi ,

can you update your sample project, please, with build tool ? Not clear what dependencies you use there. According to README it supposed to be Gradle, so just add those artifacts into the repo.

PPrydorozhnyi commented 11 months ago

@artembilan

yeap, sorry. added

artembilan commented 11 months ago

@PPrydorozhnyi ,

I see what is going on. That KafkaTemplate.clusterId() is really guilty. When we call send() with observation concurrently, all those threads are meeting a this.clusterId == null condition. And therefore all of them are calling this.kafkaAdmin.clusterId() 😄

Probably not related to this issue, but still looks like a bug 🤷

artembilan commented 11 months ago

See the fix: https://github.com/spring-projects/spring-kafka/pull/2944

PPrydorozhnyi commented 11 months ago

@artembilan nice, thanks a lot!