reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
613 stars 227 forks source link

AutoCommit Offset is not happening and Offset Commit getting failed throwing with different issues #289

Open VijayIndia opened 2 years ago

VijayIndia commented 2 years ago

I wanted my application to process the existing messages and then consume the next batch of messages.As My kafka topic receives 1 million Kafka events/sec. My application keeps on polling so much messages where most of the times i get Offset Commit getting failed and triggering a rebalance or Heap Out of space errors because of back-pressure not getting applied.

I am using the following code in my Project

   Properties props = new Properties();
         props.add("max.poll.records",5);
         props.add("max.poll.interval.ms", 45000);
         props.add("request.timeout.ms",300000);
         props.add("session.timeout.ms", 360000);
         props.add("heartbeat.interval.ms" , 30000);
         props.add("enable.auto.commit","true");
         props.add("auto.offset.reset", "earliest");
         props.add("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
         props.add("client.id","client-1");
         props.add("group.id","group-id-1");
         props.add("key.deserializer",StringDeserializer.class.getName());
         props.add("value.deserializer",StringDeserializer.class.getName());

 ReceiverOptions<String,String> receiverOptions = ReceiverOptions.<String,String>create(props)
                .subscription(Collections.singleton("Topic1"))
                .addAssignListener(partitions ->LOGGER.info("onPartitionAssigned {}",partitions))
                .addRevokeListener(partitions -> LOGGER.info("onPartitionRevoked {}",partitions))
                .commitRetryInterval(Duration.ofMillis(3000))
                .commitBatchSize(5)
                .commitInterval(Duration.ofMillis(500))
                .maxDelayRebalance(Duration.ofSeconds(30));

        Flux<ReceiverRecord<String,String>> inboundFlux = KafkaReceiver.create(receiverOptions)
                .receive()
                .onErrorContinue((throwable,o) -> {
                    LOGGER.error("Error while consuming");
                })
                .retryWhen(Retry.backoff(3, Duration.ofSeconds(60)).transientErrors(true))
                .repeat();

         inboundFlux
                    .publishOn(Schedulers.newSingle("PublishedThread"))
                    .doOnNext(k -> performOperation(k))
                    .subscribe();

         public void performOperation(ReceiverRecord<String,String>  kafkaEvent){
                           //Business logics 
                           kafkaEvent.receiverOffset().acknowledge();
         }

Getting the following Exceptions:

Error 1: Offset commit failed on partition .The coordinator is not aware of this member.
Error 2: Commit failed exceptionorg.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.

Error 4: Member client-1-09aed8d4-2413-44d2-afec-95b2a1f63263 sending LeaveGroup request to coordinator BrokerName due to the consumer is being closed

Exceptions:

Error 3: ConsumerEventLoop - Commit failed org.apache.kafka.common.KafkaException: Unexpected error in commit: The server experienced an unexpected error when processing the request.

ConsumerEventLoop - Commit failed org.apache.kafka.common.KafkaException: Unexpected error in commit: The server experienced an unexpected error when processing the request. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1292) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1187) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1184) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1159) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297) [kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) [kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) [kafka-clients-3.0.1.jar:?] at reactor.kafka.receiver.internals.ConsumerEventLoop$PollEvent.run(ConsumerEventLoop.java:331) [reactor-kafka-1.3.12.jar:1.3.12] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) [reactor-core-3.4.18.jar:3.4.18] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) [reactor-core-3.4.18.jar:3.4.18] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?] at java.lang.Thread.run(Thread.java:833) [?:?]

Error 3: Operators - Operator called default onErrorDropped org.apache.kafka.common.KafkaException: Unexpected error in commit: The server experienced an unexpected error when processing the request.

ConsumerEventLoop - Commit failed Operators - Operator called default onErrorDropped org.apache.kafka.common.KafkaException: Unexpected error in commit: The server experienced an unexpected error when processing the request. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1292) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1187) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1184) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1159) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297) [kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) [kafka-clients-3.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) [kafka-clients-3.0.1.jar:?] at reactor.kafka.receiver.internals.ConsumerEventLoop$CommitEvent.waitFor(ConsumerEventLoop.java:502) [reactor-kafka-1.3.12.jar:1.3.12] at reactor.kafka.receiver.internals.ConsumerEventLoop$CommitEvent.access$600(ConsumerEventLoop.java:379) [reactor-kafka-1.3.12.jar:1.3.12] at reactor.kafka.receiver.internals.ConsumerEventLoop$CloseEvent.run(ConsumerEventLoop.java:534) [reactor-kafka-1.3.12.jar:1.3.12] at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73) [reactor-core-3.4.18.jar:3.4.18] at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32) [reactor-core-3.4.18.jar:3.4.18] at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:227) [reactor-core-3.4.18.jar:3.4.18] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) [reactor-core-3.4.18.jar:3.4.18] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) [reactor-core-3.4.18.jar:3.4.18] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?] at java.lang.Thread.run(Thread.java:833) [?:?]

garyrussell commented 2 years ago

Please upgrade to 1.3.13; I believe this is corrected by https://github.com/reactor/reactor-kafka/pull/281

VijayIndia commented 2 years ago

@garyrussell I see 1.3.12 is the latest release build, I have already upgraded to 1.3.12 .Can you suggest on this

garyrussell commented 2 years ago

My mistake; sorry.

You should see back pressure behavior with DEBUG logging.

If that doesn't help, please provide an MCRE that exhibits the behavior.

VijayIndia commented 2 years ago

@garyrussell BackPressure is behaving perfectly , but there is a thread named 'reactive-kafka-group-id' which stops working after my retry attempts are exhausted (The server experienced an unexpected error when processing the request). Then it creates a new thread 'reactive-kafka-group-id-1' and so , is this the thread which works for polling ? I will try to provide a MCRE asap

fragaLY commented 1 year ago

So what was the problem?