reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
613 stars 227 forks source link

JoinGroup failed: The coordinator is not aware of this member error during startup - Reactive Kafka consumer subscribe #324

Open rguntu opened 1 year ago

rguntu commented 1 year ago

I have Reactive Kafka consumer and producer implementation in springboot project. Seeing the below error in logs as soon as I start the springboot application.

Errors seen the log: JoinGroup failed: The coordinator is not aware of this member. Need to re-join the group. JoinGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation

Expected Behavior

Reactive Kafka consumer should be able to join consumer group successfully.

Actual Behavior

23:15:00.825 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1676531700825 23:15:00.829 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Subscribed to topic(s): abc 23:15:01.472 [reactive-kafka-reactivekafkagroupid-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Resetting the last seen epoch of partition abc-0 to 20 since the associated topicId changed from null to ImfebTfnRHKMNhIlb0pPow 23:15:01.474 [reactive-kafka-reactivekafkagroupid-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Cluster ID: TQfWwyIUSBWvEe1gjJRhgQ 23:15:01.475 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Discovered group coordinator qtvr-akf103.tlv.lpnet.com:9092 (id: 2147483495 rack: null) 23:15:01.476 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] (Re-)joining group 23:15:02.113 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Request joining group due to: need to re-join with the given member-id 23:15:02.114 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] (Re-)joining group 23:15:02.330 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] JoinGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation{generationId=-1, memberId='consumer-reactivekafkagroupid-1-b650b53a-9b33-48a6-9aa1-cf6823923290', protocol='null'} 23:15:02.330 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response 23:15:02.331 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Request joining group due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response 23:15:02.331 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] (Re-)joining group

Steps to Reproduce

Below is my Reactive Kafka consumer code. Tried to update various kafka configs but I am getting the errors mentioned in the log above during Spring boot application startup and the moment reactiveKafkaConsumerTemplate.subscribe() is being called. Any help appreciated. Kafka config:

spring.kafka.bootstrap-servers=localhost:29092,localhost:39092

spring.kafka.consumer.properties[max.poll.interval.ms]=500 spring.kafka.consumer.properties[session.timeout.ms]=15000 spring.kafka.consumer.max-poll-records=100 spring.kafka.consumer.heartbeat-interval=3000

spring.kafka.properties[session.timeout.ms]=8000

spring.kafka.properties[heartbeat.interval.ms]=3000

 @Override
    public void run(String... args) {
        if (kafkaAdminClient.verifyConnection()) {
            log.info("CommandLineRunner - consumeFakeConsumerDTO called");
            consumeEvents();
        }
private void consumeEvents() {
        reactiveKafkaConsumerTemplate
                .receive()
                .doOnError(e -> log.error("Predicate: {}", e.getMessage()))
                .doOnNext(record -> {
                    log.info("Processing: {}@{}", record.value(), record.offset());
                    record.receiverOffset().acknowledge();
                })
                .flatMap(this::processRecordMonoDefer)
                .retryWhen(Retry.backoff(retryTimes, Duration.ofSeconds(1)).transientErrors(true))
                .onErrorResume(error -> {
                    log.error("Something bad happened while consuming: {}", error.getMessage(), error);
                    return Mono.empty();
                })
                .repeat()
                .subscribe();
    }

Possible Solution

Your Environment

garyrussell commented 1 year ago

reactor-kafka does not use the spring.kafka... properties; those are boot properties for the non-reactive consumer and producer factories.

Please show your ReceiverOptions used to create the template.

The current reactor-kafka version is 1.3.16.

rguntu commented 1 year ago

@garyrussell Here is the screenshot for ReceiverOptions: Still seeing the same issue. Screenshot 2023-02-16 at 11 57 06 AM

garyrussell commented 1 year ago

Don't use screen shots; show the actual code that builds them. That way I can try it locally so I can see what's wrong.

rguntu commented 1 year ago

@garyrussell Thank you so much. here is the code. I am seeing the issue mentioned with non-local kafka bootsrap servers only. No issue with local kafka bootsrap servers and same config. Marked the same in the code.

private void consumeEvents() {
        reactiveKafkaConsumerTemplate
                .receive()
                .doOnError(e -> log.error("Predicate: {}", e.getMessage()))
                .doOnNext(record -> {
                    log.info("Processing: {}@{}", record.value(), record.offset());
                    record.receiverOffset().acknowledge();
                })
                .flatMap(this::processRecordMonoDefer)
                .retryWhen(Retry.backoff(retryTimes, Duration.ofSeconds(1)).transientErrors(true))
                .onErrorResume(error -> {
                    log.error("Something bad happened while consuming: {}", error.getMessage(), error);
                    return Mono.empty();
                })
                .repeat()
                .subscribe();
    }
  @Bean
    public ReceiverOptions<String, Event> kafkaReceiverOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092,localhost:39092"); //local kafka bootstrap servers
        //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "nonlocalkafka101:9092,nonlocalkafka102:9092"); //network(non-local) kafka bootstrap servers

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "500");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "AMHHHH");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        Deserializer messagingDeserializer = new MessagingDeserializer<>();
        messagingDeserializer.configure(Map.of("value.deserializer.class", Event.class.getName()), false);

        ReceiverOptions<String, Event> basicReceiverOptions = ReceiverOptions.create(props)
                .withValueDeserializer(messagingDeserializer);

        return basicReceiverOptions.subscription(Collections.singletonList("abc1"));
    }
garyrussell commented 1 year ago

Can you connect to those remote servers using the command line tools, e.g. kafka-console-consumer.sh?

If not, then my best guess is some problem with the broker/cluster configuration (e.g. advertised listeners).

Either way, you should look for errors in the server logs.

If it works fine with local brokers, I suggest you ask the wider Apache Kafka community, e.g. on Stack Overflow or on one of their mailing lists.

Zane-XY commented 1 year ago

The issue does not look like caused by reactor-Kafka. Like @garyrussell mentioned, you could verify this by using the same consumer properties with other client or command line tools. Other causes of this error may be like invalid group id, security settings, or Kafka client library version etc.

dipali1234567 commented 1 year ago

@rguntu your issue resolved? If yes please can you post the solution?