seglo / kafka-lag-exporter

Monitor Kafka Consumer Group Latency with Kafka Lag Exporter
Apache License 2.0
650 stars 194 forks source link

Future.filter predicate is not satisfied #106

Open henhiskan opened 5 years ago

henhiskan commented 5 years ago

I installed v0.5.5 from helm, but it is not working, getting the Supervisor RestartSupervisor saw failure: A failure occurred while retrieving offsets error message, same from https://github.com/lightbend/kafka-lag-exporter/issues/98 but different cause :

Caused by: java.util.NoSuchElementException: Future.filter predicate is not satisfied. This is how Installed the helm

helm install https://github.com/lightbend/kafka-lag-exporter/releases/download/v0.5.5/kafka-lag-exporter-0.5.5.tgz --name kafka-lag-exporter --namespace data --set image.pullPolicy=Always --set logLevel=DEBUG --set clusters\[0\].name=dev --set clusters\[0\].bootstrapBrokers=kafka-kafka:9092 --debug

Using Confluent Kafka 2.3.0 without auth, with log message format and broker protocol 2.3

Log:


2019-11-25 16:31:42,969 INFO  o.a.kafka.common.utils.AppInfoParser  - Kafka version: 2.3.0
2019-11-25 16:31:42,969 INFO  o.a.kafka.common.utils.AppInfoParser  - Kafka commitId: fc1aaa116b661c8a
2019-11-25 16:31:42,969 INFO  o.a.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1574699502969
2019-11-25 16:31:42,977 INFO  c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-dev - Collecting offsets
2019-11-25 16:31:43,971 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-1, groupId=kafkalagexporter] Cluster ID: qk7qfN-oSpipaWLuTpLlsw
2019-11-25 16:32:03,854 ERROR c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-dev - Supervisor RestartSupervisor saw failure: A failure occurred while retrieving offsets.  Shutting down. java.lang.Exception: A failure occurred while retrieving offsets.  Shutting down.
    at com.lightbend.kafkalagexporter.ConsumerGroupCollector$CollectorBehavior.$anonfun$collector$1(ConsumerGroupCollector.scala:188)
    at akka.actor.typed.internal.BehaviorImpl$ReceiveBehavior.receive(BehaviorImpl.scala:37)
    at akka.actor.typed.Behavior$.interpret(Behavior.scala:437)
    at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:393)
    at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:52)
    at akka.actor.typed.internal.RestartSupervisor.aroundReceive(Supervision.scala:248)
    at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:79)
    at akka.actor.typed.Behavior$.interpret(Behavior.scala:437)
    at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:393)
    at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:121)
    at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:102)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
    at akka.actor.ActorCell.invoke(ActorCell.scala:581)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
    at akka.dispatch.Mailbox.run(Mailbox.scala:229)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.NoSuchElementException: Future.filter predicate is not satisfied
    at scala.concurrent.Future.$anonfun$filter$1(Future.scala:340)
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:92)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:92)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
    ... 4 common frames omitted
seglo commented 5 years ago

Thanks for reporting. This is a little tricky to troubleshoot because it's not clear from the stack trace what user code is associated with the exception. I wonder if it's from something caused by the Future wrangling in the ConsumerGroupCollector, Collect handler.

Can you set Kafka DEBUG logging when you install the chart and include the log? The helm config is kafkaLogLevel

https://github.com/lightbend/kafka-lag-exporter/blob/master/charts/kafka-lag-exporter/values.yaml#L66

henhiskan commented 5 years ago

Got a bunch of FENCED_LEADER_EPOCH failures right before AdminClient start closing the connection.

2019-11-25 19:46:11,607 DEBUG o.a.k.c.consumer.internals.Fetcher  - [Consumer clientId=consumer-1, groupId=kafkalagexporter] Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={p999105.analytic-fail-faust-fail-groupby-5={timestamp: -1, maxNumOffsets: 1, currentLeaderEpoch: Optional[183]}}, isolationLevel=READ_UNCOMMITTED) to broker kafka-bkr-7.data:9092 (id: 7 rack: null)
2019-11-25 19:46:11,607 DEBUG o.a.k.c.consumer.internals.Fetcher  - [Consumer clientId=consumer-1, groupId=kafkalagexporter] Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={p999105.analytic-windows-audit-faust-win-audit-filter-3={timestamp: -1, maxNumOffsets: 1, currentLeaderEpoch: Optional[90]}, p999105.analytic-windows-audit-faust-win-audit-filter-6={timestamp: -1, maxNumOffsets: 1, currentLeaderEpoch: Optional[94]}, p999105.analytic-vpnx-faust-vpnx-groupby-0={timestamp: -1, maxNumOffsets: 1, currentLeaderEpoch: Optional[128]}, p999105.analytic-vpnx-faust-vpnx-groupby-1={timestamp: -1, maxNumOffsets: 1, currentLeaderEpoch: Optional[148]}, p999105-enriched-url-7={timestamp: -1, maxNumOffsets: 1, currentLeaderEpoch: Optional[392]}, p999105.analytic-vpnx-faust-vpnx-groupby-11={timestamp: -1, maxNumOffsets: 1, currentLeaderEpoch: Optional[136]}, p999105.analytic-vpnx-faust-vpnx-groupby-12={timestamp: -1, maxNumOffsets: 1, currentLeaderEpoch: Optional[150]}}, isolationLevel=READ_UNCOMMITTED) to broker kafka-bkr-0.data:9092 (id: 0 rack: null)
2019-11-25 19:46:11,608 DEBUG o.a.k.c.consumer.internals.Fetcher  - [Consumer clientId=consumer-1, groupId=kafkalagexporter] Attempt to fetch offsets for partition p999105.analytic-windows-audit-faust-win-audit-filter-3 failed due to FENCED_LEADER_EPOCH, retrying.
2019-11-25 19:46:11,608 DEBUG o.a.k.c.consumer.internals.Fetcher  - [Consumer clientId=consumer-1, groupId=kafkalagexporter] Attempt to fetch offsets for partition p999105.analytic-windows-audit-faust-win-audit-filter-6 failed due to FENCED_LEADER_EPOCH, retrying.
2019-11-25 19:46:11,608 DEBUG o.a.k.c.consumer.internals.Fetcher  - [Consumer clientId=consumer-1, groupId=kafkalagexporter] Attempt to fetch offsets for partition p999105.analytic-vpnx-faust-vpnx-groupby-0 failed due to FENCED_LEADER_EPOCH, retrying.
2019-11-25 19:46:11,608 DEBUG o.a.k.c.consumer.internals.Fetcher  - [Consumer clientId=consumer-1, groupId=kafkalagexporter] Attempt to fetch offsets for partition p999105.analytic-vpnx-faust-vpnx-groupby-1 failed due to FENCED_LEADER_EPOCH, retrying.
2019-11-25 19:46:11,608 DEBUG o.a.k.c.consumer.internals.Fetcher  - [Consumer clientId=consumer-1, groupId=kafkalagexporter] Attempt to fetch offsets for partition p999105-enriched-url-7 failed due to FENCED_LEADER_EPOCH, retrying.
2019-11-25 19:46:11,608 DEBUG o.a.k.c.consumer.internals.Fetcher  - [Consumer clientId=consumer-1, groupId=kafkalagexporter] Attempt to fetch offsets for partition p999105.analytic-vpnx-faust-vpnx-groupby-11 failed due to FENCED_LEADER_EPOCH, retrying.
2019-11-25 19:46:11,608 DEBUG o.a.k.c.consumer.internals.Fetcher  - [Consumer clientId=consumer-1, groupId=kafkalagexporter] Attempt to fetch offsets for partition p999105.analytic-vpnx-faust-vpnx-groupby-12 failed due to FENCED_LEADER_EPOCH, retrying.
2019-11-25 19:46:11,608 DEBUG o.a.k.c.consumer.internals.Fetcher  - [Consumer clientId=consumer-1, groupId=kafkalagexporter] Attempt to fetch offsets for partition p999105.analytic-fail-faust-fail-groupby-5 failed due to FENCED_LEADER_EPOCH, retrying.
2019-11-25 19:46:12,516 DEBUG o.a.k.clients.admin.KafkaAdminClient  - [AdminClient clientId=adminclient-1] Initiating close operation.
2019-11-25 19:46:12,516 DEBUG o.a.k.clients.admin.KafkaAdminClient  - [AdminClient clientId=adminclient-1] Waiting for the I/O thread to exit. Hard shutdown in 10000 ms.
2019-11-25 19:46:12,519 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name connections-closed:
2019-11-25 19:46:12,519 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name connections-created:
2019-11-25 19:46:12,519 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name successful-authentication:
2019-11-25 19:46:12,519 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name successful-reauthentication:
2019-11-25 19:46:12,519 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name successful-authentication-no-reauth:
2019-11-25 19:46:12,520 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name failed-authentication:
2019-11-25 19:46:12,520 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name failed-reauthentication:
2019-11-25 19:46:12,520 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name reauthentication-latency:
2019-11-25 19:46:12,520 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name bytes-sent-received:
2019-11-25 19:46:12,520 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name bytes-sent:
2019-11-25 19:46:12,521 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name bytes-received:
2019-11-25 19:46:12,521 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name select-time:
2019-11-25 19:46:12,521 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name io-time:
2019-11-25 19:46:12,522 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name node--1.bytes-sent
2019-11-25 19:46:12,522 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name node--1.bytes-received
2019-11-25 19:46:12,522 DEBUG o.a.kafka.common.metrics.Metrics  - Removed sensor with name node--1.latency
[...]
019-11-25 19:46:12,528 DEBUG o.a.k.clients.admin.KafkaAdminClient  - [AdminClient clientId=adminclient-1] Exiting AdminClientRunnable thread.
2019-11-25 19:46:12,528 DEBUG o.a.k.clients.admin.KafkaAdminClient  - [AdminClient clientId=adminclient-1] Kafka admin client closed.
[...]
2019-11-25 19:46:12,536 DEBUG o.a.k.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-1, groupId=kafkalagexporter] Kafka consumer has been closed
2019-11-25 19:46:12,544 ERROR c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-dev - Supervisor RestartSupervisor saw failure: A failure occurred while retrieving offsets.  Shutting down. java.lang.Exception: A failure occurred while retrieving offsets.  Shutting down.
        at com.lightbend.kafkalagexporter.ConsumerGroupCollector$CollectorBehavior.$anonfun$collector$1(ConsumerGroupCollector.scala:188)
        at akka.actor.typed.internal.BehaviorImpl$ReceiveBehavior.receive(BehaviorImpl.scala:37)
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:437)
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:393)
        at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:52)
        at akka.actor.typed.internal.RestartSupervisor.aroundReceive(Supervision.scala:248)
        at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:79)
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:437)
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:393)
        at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:121)
        at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:102)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
        at akka.actor.ActorCell.invoke(ActorCell.scala:581)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
        at akka.dispatch.Mailbox.run(Mailbox.scala:229)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.NoSuchElementException: Future.filter predicate is not satisfied
        at scala.concurrent.Future.$anonfun$filter$1(Future.scala:340)
        at scala.util.Success.$anonfun$map$1(Try.scala:255)
        at scala.util.Success.map(Try.scala:213)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:92)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
        at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:92)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
        ... 4 common frames omitted
pimpelsang commented 4 years ago

Hit this problem when upgrading lightbend/kafka-lag-exporter 0.5.5 -> 0.6.2 and connecting kafka cluster running 1.1

seglo commented 4 years ago

I'm still unable to reproduce this. I tried running integration tests against Kafka 1.1, but they were successful. I suspect it's a combination of app configuration and Kafka API state.

@pimpelsang Can you reproduce this issue reliably? When DEBUG is enabled do you ever see a log line of Received Offsets Snapshot? I think this exception is caused prior to this logging, but if not can you provide a log dump so I can attempt to reproduce state locally?

seglo commented 4 years ago

Please provide configuration as well.

plakat commented 3 years ago

Hi! I'm seeing an error like this in my installations as well, the stack trace is preceded by

[AdminClient clientId=clj_internal] Initiating close operation. [AdminClient clientId=clj_internal] Waiting for the I/O thread to exit. Hard shutdown in 10000 ms. [AdminClient clientId=clj_internal] Exiting AdminClientRunnable thread. [AdminClient clientId=clj_internal] Kafka admin client closed. [Consumer clientId=clj_internal, groupId=kafkalagexporter] Executing onLeavePrepare with generation Generation{generationId=-1, memberId='', protocol='null'} and memberId [Consumer clientId=clj_internal, groupId=kafkalagexporter] Resetting generation due to consumer pro-actively leaving the group [Consumer clientId=clj_internal, groupId=kafkalagexporter] Kafka consumer has been closed Clearing all metrics before shutdown Supervisor RestartSupervisor saw failure: A failure occurred while retrieving offsets. Shutting down. java.lang.Exception: A failure occurred while retrieving offsets. Shutting down.

I've tried to set whitelist filters for both topics and consumer groups (although no strange topic of group names are to be found), tried to increase JVM heap (no change). The AdminClient shuts down every 2-4 seconds after initially running for a bit longer (20-30sec).

This is on a standalone installation of version 0.6.7 running against an Apache Kafka 2.7.0 cluster.

plakat commented 3 years ago

Meanwhile we upgraded to Kafka 2.8.0, unfortunately this problem still persists...

arnoldyahad commented 2 years ago

hey guys, i've suffered from the same issue. using kafka-lag-exporter 0.6.4 and 0.6.8 (latest available at the moment) kafka is on version 2.5.0

issue: kafka-lag-exporter stopped providing lag metrics.

using docker logs command, i've seen this:

2022-04-05 09:55:18,587 ERROR c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-kafka-mediation - Supervisor RestartSupervisor saw failure: A failure occurred while retrieving offsets.  Shutting down. java.lang.Exception: A failure occurred while retrieving offsets.  Shutting down.

and this:

Caused by: java.util.NoSuchElementException: Future.filter predicate is not satisfied

by reading the server.log on one of the kafka broker, i've noticed this error:

[2022-04-05 09:24:28,487] ERROR [GroupMetadataManager brokerId=1016] Error loading offsets from __consumer_offsets-47 (kafka.coordinator.group.GroupMetadataManager)
java.lang.IllegalStateException: Unexpected unload of active group ConsumerGroup4 while loading partition __consumer_offsets-47
    at kafka.coordinator.group.GroupMetadataManager.$anonfun$doLoadGroupsAndOffsets$31(GroupMetadataManager.scala:710)
    at kafka.coordinator.group.GroupMetadataManager.$anonfun$doLoadGroupsAndOffsets$31$adapted(GroupMetadataManager.scala:704)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at kafka.coordinator.group.GroupMetadataManager.loadGroupsAndOffsets(GroupMetadataManager.scala:704)
    at kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleLoadGroupAndOffsets$2(GroupMetadataManager.scala:524)
    at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

looking at the source code of the exception: https://github.com/apache/kafka/blob/2.5.0/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L704-L713 it looks like due to if the cache already contains a group which should be removed, raise an error. it means there is a deleted consumer group that is still cached somehow.

solution: this command, running from the broker, solved it: ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group ConsumerGroup4

so if you are encountering this issue - try to grep for ERROR logs on server.logs and see if you need to clean up a consumer group.

but the real question is - why would this cause lag exporter to stop providing lag metrics for all the topics? even when whitelisting only 1 topic it still can't work.

seglo commented 2 years ago

Thanks for the datapoint. That should be helpful.

ngpadp12 commented 2 years ago

Hello @seglo ,

I am getting an issue similar to this one, where it suggests Future.filter predicate is not satisfied.

When I enable the debug logs, I get log messages like

Leader for partition x is unknown for fetching offset -2
Exiting AdminClientRunnable thread

and

ERROR a.a.t.internal.adapter.ActorAdapter akka://kafka-lag-exporter/user/consumer-group-collector-x - KafkaConsumer is not safe for multi-threaded access
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

I am not able to fetch lag metrics for the clusters where I see this issue.

However, when I whitelist one consumer group, the exporter works fine.

Versions: Kafka = 2.11 lag-exporter = 0.6.8

Request you to help me with this.

Thanks!