linkedin / Burrow

Kafka Consumer Lag Checking
Apache License 2.0
3.7k stars 793 forks source link

Prometheus Metrics not showing all consumer groups #637

Open jbresciani opened 4 years ago

jbresciani commented 4 years ago

I'm running burrow (built from master as of June 3rd, 2020) against kafka 2.4.1. Prometheus metrics showing up in /metrics in but burrow_kafka_consumer_current_offset and burrow_kafka_consumer_partition_lag only show some of the consumer groups.

The missing consumer groups do appear in the /v3/kafka//consumer/ endpoint. But I've noticed that the missing groups have null for some partition offsets. i.e.

{
"offsets": [
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
],
"owner": "/10.0.0.1",
"client_id": "consumer-8",
"current-lag": 0
},

The corresponding /v3/kafka//consumer//lag endpoint shows entries like:

{
"topic": "myTopic",
"partition": 2,
"owner": "/10.0.0.1",
"client_id": "consumer-8",
"status": "OK",
"start": null,
"end": null,
"current_lag": 0,
"complete": 0.06666667
}

I am currently assuming that the null's are due to this cluster being in our TST environment and our messages are produced keyed so it's not unusual for some partitions to see no traffic for days/weeks at a time which means it's possible that some offsets are past the retention period.

millern commented 4 years ago

I'm running into the same issue. Consumer groups are excluded when any partition offset is null. In my case, consumer groups consume multiple topics, so a null partition offset for one of many topics causes the entire consumer group to be excluded from metrics.

I think the issue is on this line: https://github.com/linkedin/Burrow/pull/628/files#diff-0f9ca663563a0c68cf314325f84c48b8R67. Consumer group completeness is an average of partition completeness, so if any partition has completeness < 1.0 the entire group is excluded.

jbresciani commented 4 years ago

I've been playing with that file locally, I can confirm that's part of it.

There is also an issue with line 91 in the same file. https://github.com/linkedin/Burrow/pull/628/files#diff-0f9ca663563a0c68cf314325f84c48b8R91

If you remove the completeness check and let it run but when it hits a partition.End.Offset value that has the value nil then you get a panic: runtime error: invalid memory address or nil pointer dereference. I don't understand go well enough to handle that error or how to check for nil on what go linting is telling me should be an int64 value.

millern commented 4 years ago

Does that happen when the partition.Complete < 1.0 check is removed as well as the consumerStatus.Complete < 1.0 check?

Looking in this file https://github.com/linkedin/Burrow/blob/master/core/internal/evaluator/caching.go#L270 where I think partition.End is set, it seems like partition.Complete should be < 1.0 whenever End is nil. So, if the partition.Complete < 1.0 check remains there shouldn't be an error.

This has fixed it for me so far: https://github.com/millern/Burrow/commit/258fb1902b9c359f8e78066dab1efef67a8c90a5

jbresciani commented 4 years ago
diff --git a/core/internal/httpserver/prometheus.go b/core/internal/httpserver/prometheus.go
index 92456f5..2a9248f 100644
--- a/core/internal/httpserver/prometheus.go
+++ b/core/internal/httpserver/prometheus.go
@@ -63,33 +63,29 @@ func (hc *Coordinator) handlePrometheusMetrics() http.HandlerFunc {
                                consumerStatus := getFullConsumerStatus(hc.App, cluster, consumer)

                                if consumerStatus == nil ||
-                                       consumerStatus.Status == protocol.StatusNotFound ||
-                                       consumerStatus.Complete < 1.0 {
+                                       consumerStatus.Status == protocol.StatusNotFound {
                                        continue
                                }
-
                                labels := map[string]string{
                                        "cluster":        cluster,
                                        "consumer_group": consumer,
                                }
-
-                               consumerTotalLagGauge.With(labels).Set(float64(consumerStatus.TotalLag))
-                               consumerStatusGauge.With(labels).Set(float64(consumerStatus.Status))
+                               if consumerStatus.Complete >= 1.0 {
+                                       consumerTotalLagGauge.With(labels).Set(float64(consumerStatus.TotalLag))
+                                       consumerStatusGauge.With(labels).Set(float64(consumerStatus.Status))
+                               }

                                for _, partition := range consumerStatus.Partitions {
-                                       if partition.Complete < 1.0 {
-                                               continue
-                                       }
-
                                        labels := map[string]string{
                                                "cluster":        cluster,
                                                "consumer_group": consumer,
                                                "topic":          partition.Topic,
                                                "partition":      strconv.FormatInt(int64(partition.Partition), 10),
                                        }
-
-                                       consumerPartitionCurrentOffset.With(labels).Set(float64(partition.End.Offset))
                                        consumerPartitionLagGauge.With(labels).Set(float64(partition.CurrentLag))
+                                       if partition.Complete >= 1.0 {
+                                               consumerPartitionCurrentOffset.With(labels).Set(float64(partition.End.Offset))
+                                       }
                                }
                        }

@@ -104,7 +100,6 @@ func (hc *Coordinator) handlePrometheusMetrics() http.HandlerFunc {
                                }
                        }
                }
-
                promHandler.ServeHTTP(resp, req)
        })
 }

yup, the above change works for me

james-bjss commented 3 years ago

@fr0stbyte Sorry to bump this, but is your fix still progressing? Noticed there were some feedback comments on the PR you created and it seems to have stalled.

Like the others in this thread this is causing big problems for us in test environments. Trying to work out if we just fork the repo and apply the fix ourselves or if we should hold on for the PR to be merged in.

james-bjss commented 3 years ago

Sorry to hijack this, but we have been running off a fork for some time to address this issue. A few of us are keen to see this fix make its way into the main code. Have addressed the issues mentioned above on the original PR.

fr0stbyte commented 3 years ago

@james-bjss no problem, I have pushed some updates on Apr 8th, but there was no update from the requestor of changes. Hoping one of us will merge this in at some point.

jbresciani commented 2 years ago

with the merger of 638 and 700 I believe this is now fixed and can be closed.