spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.17k stars 1.55k forks source link

As a Spring Kafka User, I want to trace messages even without clusterId #3458

Closed iamgollapalli closed 1 month ago

iamgollapalli commented 1 month ago

Expected Behavior

I want to use Spring Kafka Observation without clusterId of kafka. Expose some method in KafkaTemplate and KafkaMessageListener Container to accept cluserId from properties file so that I can skip fetching clusterid value with KafkaAdmin

Current Behavior

As of now in KafkaTemplate and KafkaMessageListenerContainer expects cluster id derived from KafkaAdmin for creating Observability context. Since Spring Kafka Observation is heavily depend on Kafka Admin client for getting cluster id, but sometimes KafkaAdmin is not working as expected ( may be due to the environment ) and Kafka performance is getting impacted

Context

I am deploying Kafka Application in an environment where KafkaAdmin might not work, due to this every message publish/consumer if the observation is enabled KafkaAdmin tries to fetch clusterid every time ( it might be an issue with environment ) due to this Kafka performance is low especially on consumer side.

Anyway KafkaRecordReceiverContext & KafkaRecordSenderContext are using clusterid value to just set the name of RemoteServiceName, it will be good if the user can set the remoteservicename through yml properties.

artembilan commented 1 month ago

The clusterId can be set on the KafkaAdmin:

    /**
     * Set the cluster id. Use this to prevent attempting to fetch the cluster id
     * from the broker, perhaps if the user does not have admin permissions.
     * @param clusterId the clusterId to set
     * @since 3.1
     */
    public void setClusterId(String clusterId) {
        this.clusterId = clusterId;
    }

Not sure why you talk about KafkaAdminClient where both those KafkaTemplate and KafkaMessageListenerContainer rely on the KafkaAdmin instead. In fact a KafkaAdmin might not be injected into them, so, the clusterId won't be evaluated for those traces.

If you are asking about exposing that clusterId for the KafkaAdmin auto-configuration, then the issue has to be raised in the Spring Boot project: https://github.com/spring-projects/spring-boot.

Closed as Works as Designed.

iamgollapalli commented 1 month ago

@artembilan I mean to say about KafkaAdmin only not KafkaAdminClient. Yes eventhough KafkaAdmin exposes setter method but is it really necessary to rely on the KafkaAdmin response for getting clusterId ? Especially on consumer side due to this we have observed memory issue.

artembilan commented 1 month ago

Well, not sure how. The logic there is like this:

        @Nullable
        private String clusterId() {
            if (this.kafkaAdmin != null && this.clusterId == null) {
                obtainClusterId();
            }
            return this.clusterId;
        }

        private void obtainClusterId() {
            if (this.kafkaAdmin != null) {
                this.clusterId = this.kafkaAdmin.clusterId();
            }
        }

And further to the KafkaAdmin:

    @Override
    @Nullable
    public String clusterId() {
        if (this.clusterId == null) {
            try (AdminClient client = createAdmin()) {
                this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS);
                if (this.clusterId == null) {
                    this.clusterId = "null";
                }
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
            catch (Exception ex) {
                LOGGER.error(ex, "Could not obtain cluster info");
            }
        }
        return this.clusterId;
    }

So, if you set that clusterId explicitly, no any AdminClient is going to be created.

Might be great to be sure that you are on the latest Spring for Apache Kafka version. And them some simple project to reproduce that memory leak.

is it really necessary to rely on the KafkaAdmin response for getting clusterId ?

I think that's correct. the clusterId is really nor a consumer, neither producer responsibility. That's why we have introduced a KafkaAdmin abstraction to manage and handle cluster operations and respective info. Yes, there is no need to connect to the cluster if clusterId is explicitly set.

iamgollapalli commented 1 month ago

@artembilan depending on environement describecluster may not work as expected for example we need DescribeCluster permission in case of AWS MSK. Since application fails to get clusterid everytime application try to do same operation ( cluster id value is still null ).

Due to this we are observing memory issue as well CG lag is increasing.

I have created ticket under spring boot project - https://github.com/spring-projects/spring-boot/issues/42028

artembilan commented 1 month ago

depending on environement describecluster may not work as expected

I still don't understand how that is possible if you set clusterId explicitly into that KafkaAdmin:

    public String clusterId() {
        if (this.clusterId == null) {
            try (AdminClient client = createAdmin()) {
                this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS);

You can have an auto-configured KafkaAdmin injected somewhere in your services and all its setClusterId(String) manually against respective configuration property.