spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Do not perform alter topics call if no changes required #991

Closed tdanylchuk closed 3 years ago

tdanylchuk commented 3 years ago

There are a few purposes of this change request:

So Recently I've tried to bump the cloud version to the latest and failed on deploying my app. In tests I have broker with all features enabled whereas on dev environment I have features like INCREMENTAL_ALTER_CONFIGS disabled.

Application yet successfully started but stream never went up due to

org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support INCREMENTAL_ALTER_CONFIGS     at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:339)     at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:224)  at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:190)    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:86)     at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:403)   at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:91)    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)     at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:201)  at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)   at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)   at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)    at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support INCREMENTAL_ALTER_CONFIGS    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)  at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)   at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)  at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:404)    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:349)  at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:326)     ... 15 more Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support INCREMENTAL_ALTER_CONFIGS
--

From this exception is not clear which bonder is actually causing an issue, so only debugging help.

Also, I have a question from this case: isn't healthcheck for binder supposed to be DOWN is this case?

sobychacko commented 3 years ago

@tdanylchuk Thanks for the PR. It is now merged upstream and backported to 3.0.x. Looking forward to more PR contributions.

tdanylchuk commented 3 years ago

Guys @sobychacko @garyrussell sorry for rising this topic once again but i do interested in Also, I have a question from this case: isn't healthcheck for binder supposed to be DOWN is this case? can healthcheck check kafkaConsumerContainer status also not the only topic and broker connectivity?

tdanylchuk commented 3 years ago

Sorry for tagging you guys @sobychacko @garyrussell but I just want to clean the dust from this topic regarding my question in the previous post, we faced a couple of times, that service was unable to consume records and was healthy so we needed to spend some time on the investigation since we have alerts only for service health status.

sobychacko commented 3 years ago

@tdanylchuk Could you start a new issue on this matter? It is easy to miss updates when commenting on closed issues. I didn't really fully understand the context, so it might be better to start a new thread/issue.