spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Intermittent `TopicAuthorizationException` only while rebalancing #1058

Closed UgiR closed 3 years ago

UgiR commented 3 years ago

Background:

Below is the gist of my configuration:

I have my-topic-main_suffix and my-topic-profile_suffix. When myprofile is active, only my-topic-profile_suffix should be used.

application.yml

my-suffix: -main_suffix

spring:
  cloud:
    stream:
      bindings:
        my-events-in:
          destination: my-topic${my-suffix}
          group: my-group

---
spring:
  profiles: myprofile

my-suffix: -profile_suffix

This works as expected. I am able to consume messages from the correct topic my-topic-profile_suffix. However, I run into this issue intermittently (once or twice a day?):

 o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-my-group-2, groupId=my-group] Attempt to heartbeat failed since group is rebalancing
 o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-2, groupId=my-group] Revoke previously assigned partitions my-topic-profile_suffix-12, my-topic-profile_suffix-13, my-topic-profile_suffix-14, my-topic-profile_suffix-15, my-topic-profile_suffix-16, my-topic-profile_suffix-17, my-topic-profile_suffix-18, my-topic-profile_suffix-19, my-topic-profile_suffix-0, my-topic-profile_suffix-1, my-topic-profile_suffix-2, my-topic-profile_suffix-3, my-topic-profile_suffix-4, my-topic-profile_suffix-5, my-topic-profile_suffix-6, my-topic-profile_suffix-7, my-topic-profile_suffix-8, my-topic-profile_suffix-9, my-topic-profile_suffix-10, my-topic-profile_suffix-11
 o.s.c.s.b.k.KafkaMessageChannelBinder$1  : my-group: partitions revoked: [my-topic-profile_suffix-12, my-topic-profile_suffix-13, my-topic-profile_suffix-14, my-topic-profile_suffix-15, my-topic-profile_suffix-16, my-topic-profile_suffix-17, my-topic-profile_suffix-18, my-topic-profile_suffix-19, my-topic-profile_suffix-0, my-topic-profile_suffix-1, my-topic-profile_suffix-2, my-topic-profile_suffix-3, my-topic-profile_suffix-4, my-topic-profile_suffix-5, my-topic-profile_suffix-6, my-topic-profile_suffix-7, my-topic-profile_suffix-8, my-topic-profile_suffix-9, my-topic-profile_suffix-10, my-topic-profile_suffix-11]
 o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-my-group-2, groupId=my-group] (Re-)joining group
 org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-my-group-2, groupId=my-group] Error while fetching metadata with correlation id 160121 : {my-topic-main_suffix=TOPIC_AUTHORIZATION_FAILED}
 org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-my-group-2, groupId=my-group] Topic authorization failed for topics [my-topic-main_suffix]
 o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-my-group-2, groupId=my-group] Join group failed with org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [my-topic-main_suffix]
 essageListenerContainer$ListenerConsumer : Authorization Exception and no authorizationExceptionRetryInterval set
 org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [my-topic-main_suffix]
 essageListenerContainer$ListenerConsumer : Fatal consumer exception; stopping container

What I am trying to understand is why it seems like it's revoking the partitions I expect, for example my-topic-profile_suffix-12, but when it tries to rejoin, the logs indicating it is using the topic outside of the myprofile configuration: {my-topic-main_suffix=TOPIC_AUTHORIZATION_FAILED}.

To confirm, I also pulled this from the startup logs, which seemingly indicates we are bound to the expected topic:

k.p.SpringCloudStreamBindingsProperties : Topic binding found in application.yml = SpringCloudStreamBindingsProperties.TopicBinding(topicBindingName=my-events-in, destination=my-topic-profile_suffix, group=my-group, binder=default)

I am not sure why I am seeing this TopicAuthorizationException, when this active profile shouldn't even be 'aware' of the my-topic-main_suffix topic.

Any insights would be appreciated!

garyrussell commented 3 years ago

I don't see how that can be possible, even if the environment is somehow getting messed up, the listener container topic(s) are contained in a final field in the container properties (initialized during container creation).

I don't see any such behavior with this

@SpringBootApplication
public class Kbgh1058Application {

    private static final Logger log = LoggerFactory.getLogger(Kbgh1058Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Kbgh1058Application.class, "--spring.profiles.active=bar");
    }

    @Bean
    Consumer<String> input() {
        return System.out::println;
    }

    @Bean
    public ApplicationRunner runner(Environment env) {
        return args -> {
            log.info(env.getProperty("suffix"));
        };
    }

}
spring:
  cloud:
    stream:
      bindings:
        input-in-0:
          destination: kbgh1058${suffix}
          group: kbgh1058
suffix: -foo

---

spring:
  profiles: bar
suffix: -baz

Can you change your logging configuration to show the thread name and re-post?

UgiR commented 3 years ago

All container-0-C-1.

[container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-my-group-2, groupId=my-group] Attempt to heartbeat failed since group is rebalancing
[container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-2, groupId=my-group] Revoke previously assigned partitions my-topic-profile_suffix-12, my-topic-profile_suffix-13, my-topic-profile_suffix-14, my-topic-profile_suffix-15, my-topic-profile_suffix-16, my-topic-profile_suffix-17, my-topic-profile_suffix-18, my-topic-profile_suffix-19, my-topic-profile_suffix-0, my-topic-profile_suffix-1, my-topic-profile_suffix-2, my-topic-profile_suffix-3, my-topic-profile_suffix-4, my-topic-profile_suffix-5, my-topic-profile_suffix-6, my-topic-profile_suffix-7, my-topic-profile_suffix-8, my-topic-profile_suffix-9, my-topic-profile_suffix-10, my-topic-profile_suffix-11
[container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : my-group: partitions revoked: [my-topic-profile_suffix-12, my-topic-profile_suffix-13, my-topic-profile_suffix-14, my-topic-profile_suffix-15, my-topic-profile_suffix-16, my-topic-profile_suffix-17, my-topic-profile_suffix-18, my-topic-profile_suffix-19, my-topic-profile_suffix-0, my-topic-profile_suffix-1, my-topic-profile_suffix-2, my-topic-profile_suffix-3, my-topic-profile_suffix-4, my-topic-profile_suffix-5, my-topic-profile_suffix-6, my-topic-profile_suffix-7, my-topic-profile_suffix-8, my-topic-profile_suffix-9, my-topic-profile_suffix-10, my-topic-profile_suffix-11]
[container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-my-group-2, groupId=my-group] (Re-)joining group
[container-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-my-group-2, groupId=my-group] Error while fetching metadata with correlation id 160121 : {my-topic-main_suffix=TOPIC_AUTHORIZATION_FAILED}
[container-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-my-group-2, groupId=my-group] Topic authorization failed for topics [my-topic-main_suffix]
[container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-my-group-2, groupId=my-group] Join group failed with org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [my-topic-main_suffix]
[container-0-C-1] essageListenerContainer$ListenerConsumer : Authorization Exception and no authorizationExceptionRetryInterval set
 org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [my-topic-main_suffix]
[container-0-C-1] essageListenerContainer$ListenerConsumer : Fatal consumer exception; stopping container

Yes, I'm thinking it's not the configuration causing it.

One observation is that having a different consumer group name for each topic solves this issue. I do not understand why. So the only way the app can 'know' of this unauthorized topic is if the cluster tells us about it during the metadata update?