spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
1.01k stars 612 forks source link

Suggestion for a solution when an error is received during createPartition #2542

Open omercelikceng opened 2 years ago

omercelikceng commented 2 years ago

Hello, I am getting an error as below. This may not be an error. Because the bind method is called again and maybe the error will not be thrown again. That's why I want to give a suggestion about it.

2022-10-19 15:23:50.341 [main] ERROR org.springframework.cloud.stream.binding.BindingService - Failed to create producer binding; retrying in 30 seconds
        org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for sloth-palette-service-update-symbology-to-materiel-topic-local-323; nested exception is java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidPartitionsException: Topic already has 8 partitions.
        at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:347) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:162) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:86) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:233) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:92) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:323) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:288) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:297) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:301) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:142) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
        at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[?:?]
        at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
        at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.13.jar:5.3.13]
        at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.13.jar:5.3.13]
        at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.13.jar:5.3.13]
        at java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
        at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.13.jar:5.3.13]
        at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.13.jar:5.3.13]
        at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.13.jar:5.3.13]
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.13.jar:5.3.13]
        at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:144) ~[spring-boot-2.4.13.jar:2.4.13]
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:771) ~[spring-boot-2.4.13.jar:2.4.13]
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:763) ~[spring-boot-2.4.13.jar:2.4.13]
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:438) ~[spring-boot-2.4.13.jar:2.4.13]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:339) ~[spring-boot-2.4.13.jar:2.4.13]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1329) ~[spring-boot-2.4.13.jar:2.4.13]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1318) ~[spring-boot-2.4.13.jar:2.4.13]
        at tr.com.omer.sloth.cloud.framework.service.palette.app.PaletteServiceApplication.main(PaletteServiceApplication.java:13) ~[classes/:?]
        Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidPartitionsException: Topic already has 8 partitions.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) ~[kafka-clients-2.6.3.jar:?]
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) ~[kafka-clients-2.6.3.jar:?]
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104) ~[kafka-clients-2.6.3.jar:?]
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272) ~[kafka-clients-2.6.3.jar:?]
        at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:409) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:357) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
        at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:334) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
        
        ... 30 more
        Caused by: org.apache.kafka.common.errors.InvalidPartitionsException: Topic already has 8 partitions.

Hello, I am getting an error as below in my project. I know I got this error because kafka's metadata has not been updated yet. Even though the Topic partition is 8, I am getting this error because the createPartitions(8) method is called. When I examine it, I get this error in the code block below.

createPartitions

Then it enters the code in the next image and tries to bind the producer again. But when I try again, the transactions will complete successfully. I didn't need to write the error log here. I think the error log here is misinforming the developer.

reBindProducer2

reBindProducer

I have a few suggestions.

I would like to briefly summarize the example problem again. Actually, a topic's partition is 8. However, when this information is pulled from metadata, it comes as 1. That's why the createPartition method is called and kafka throws InvalidPartitionsException. Then this error is thrown. And the bind producer method is called again in the catch method.

My first recommendation: This log should be removed. The developer thinks it's a bug. However, there is a chance that this error will go away when it tries again.

My second suggestion: Let's add a retry mechanism to the code below. Let's throw it only if the retry fails as a result. createPartitions

If one of my suggestions is accepted, I would like to send a pull request.

sobychacko commented 1 year ago

@omercelikceng Is this issue still valid for your use case? I wonder if there is any more info that you can add.

omercelikceng commented 1 year ago

@sobychacko I can't remember this topic exactly. However, I think this situation still continues. If I have a free time, I will review it and open a pull request. But I'm busy these days. I need some time.

sobychacko commented 1 year ago

@omercelikceng No problem. Please let us know how you want to continue.