spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.19k stars 1.57k forks source link

KafkaAdmin inside KafkaTemplate only gets the BOOTSTRAP_SERVER_CONFIG from producerFactory #3503

Closed hperez-lab closed 2 months ago

hperez-lab commented 2 months ago

I encountered this issue in version 3.2.3, and it is also present in version 3.3.0-M3.

The afterSingletonsInstantiated method in KafkaTemplate retrieves the default KafkaAdmin instance (if it has not been overridden) and configures it using the producerFactory property BOOTSTRAP_SERVERS_CONFIG. However, we experienced an error when trying to send messages to Kafka due to the SECURITY_PROTOCOL_CONFIG, which defaults to PLAINTEXT instead of SSL, as required by our configuration. Below is the error we encountered while attempting to send a message:


"stack_trace":"java.util.concurrent.TimeoutException: null
  java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960)
  java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095)
  org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
  org.springframework.kafka.core.KafkaAdmin.clusterId(KafkaAdmin.java:351)
  org.springframework.kafka.core.KafkaTemplate.clusterId(KafkaTemplate.java:530)
  org.springframework.kafka.support.micrometer.KafkaRecordSenderContext.<init>(KafkaRecordSenderContext.java:48)
  org.springframework.kafka.core.KafkaTemplate.lambda$observeSend$3(KafkaTemplate.java:788)
  io.micrometer.observation.Observation.createNotStarted(Observation.java:172)
  io.micrometer.observation.docs.ObservationDocumentation.observation(ObservationDocumentation.java:188)
  org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:786)
  org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:596)

Steps to Reproduce

  1. Configure the KafkaTemplate bean using the KafkaProducerFactory and set ObservationEnabled to true.
  2. Attempt to send a message to Kafka.

Expected Behavior

The SECURITY_PROTOCOL_CONFIG should be retrieved from the producerFactory configuration, just like BOOTSTRAP_SERVERS_CONFIG.

Workaround

We resolved this issue by overriding the default KafkaAdmin with our custom configuration. However, since this bean is used internally by KafkaTemplate, we believe that SECURITY_PROTOCOL_CONFIG should be handled in the same way as BOOTSTRAP_SERVERS_CONFIG within the KafkaTemplate configuration.

hperez-lab commented 2 months ago

Closed due to duplicated https://github.com/spring-projects/spring-kafka/issues/2780