open-telemetry / opentelemetry-collector-contrib

Contrib repository for the OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
3.1k stars 2.39k forks source link

[receiver/kafka] otelcol_kafka_receiver_offset_lag not calculated correctly #36093

Open dyanneo opened 3 weeks ago

dyanneo commented 3 weeks ago

Component(s)

receiver/kafka

What happened?

We wanted to collect, graph, and alert on lag for the kafka receiver, but observed unexpected behavior when observing the otelcol_kafka_receiver_offset_lag's last values, compared to the values observed using kafka's consumer-groups utility.

Description

The value of last for the measurement otelcol_kafka_receiver_offset_lag does not appear to be calculated correctly. Also, for context, we are seeing an issue in the otel collector where it keeps emitting lag metrics for partitions it's no longer consuming.

Steps to Reproduce

Expected Result

Per this screen shot, the value of partition 4's lag as shown using kafka's consumer-groups.sh utility changes over time, and is in the low hundreds or smaller:

Screenshot 2024-10-30 at 10 19 19 AM

Actual Result

Per this screen shot, the value of partition 4's lag does not match what true lag is, per kafka's tools:

Screenshot 2024-10-30 at 10 31 03 AM

Query in Grafana query builder:

Screenshot 2024-10-30 at 10 21 27 AM

Query as Grafana is running it:

Screenshot 2024-10-30 at 10 22 00 AM

Collector version

otelcol_version: 0.109.0

Environment information

Environment

OS: linux container

OpenTelemetry Collector configuration

---
receivers:
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:PORT
  prometheus:
    config:
      scrape_configs:
        - job_name: otel-collector-metrics
          scrape_interval: 30s
          static_configs:
            - targets: ["127.0.0.1:PORT"]

processors:
  memory_limiter:
    check_interval: 1ms
    limit_percentage: 70
    spike_limit_percentage: 15

  resource:
    attributes:
      - key: application
        from_attribute: service.name
        action: insert

  resource/internal:
    attributes:
      - key: labels.application
        value: ${env:CLOUD_APPLICATION}
        action: upsert
      - key: application
        value: ${env:CLOUD_APPLICATION}
        action: upsert

  filter/application:
    error_mode: ignore
    traces:
      span:
        - 'resource.attributes["application"] == nil'
    metrics:
      metric:
        - 'resource.attributes["application"] == nil'

  filter/tracingsampler:
    error_mode: ignore
    traces:
      span:
        - 'attributes["sampler.parent"] == nil'

  filter/summarytype:
    error_mode: ignore
    metrics:
      metric:
        - "type == METRIC_DATA_TYPE_SUMMARY"

  metricstransform/version_correction:
    transforms:
      - match_type: strict
        include: tgt.otel.extension.heartbeat
        action: update
        operations:
          - action: update_label
            label: otel.extension.version
            value_actions:
              - value: "v0.8.0\n"
                new_value: "v0.8.0"
          - action: update_label
            label: otel.scope.version
            value_actions:
              - value: "v0.8.0\n"
                new_value: "v0.8.0"

  batch/metrics:
    timeout: 5s
    send_batch_size: 100
    send_batch_max_size: 101

  batch/traces:
    timeout: 5s
    send_batch_size: 33
    send_batch_max_size: 34

exporters:
  tgtkafkaexporter:
    brokers:
      - BROKER_REDACTED
    topic: TOPIC_REDACTED
    protocol_version: 2.7.0
    encoding_extension: tgtinflux_encoding
    producer:
      compression: snappy
      max_message_bytes: 2000000

  kafka:
    brokers:
      - BROKER_REDACTED
    topic: TOPIC_REDACTED
    protocol_version: 2.7.0
    producer:
      compression: snappy
      max_message_bytes: 2000000

service:
  pipelines:
    metrics:
      receivers:
        - otlp
      processors:
        - memory_limiter
        - resource
        - filter/application
        - filter/summarytype
        - metricstransform/version_correction
        - batch/metrics
      exporters:
        - tgtkafkaexporter

    metrics/internal:
      receivers:
        - prometheus
      processors:
        - resource
        - resource/internal
        - filter/application
        - filter/summarytype
        - metricstransform/version_correction
        - batch/metrics
      exporters:
        - tgtkafkaexporter

    traces:
      receivers:
        - otlp
      processors:
        - memory_limiter
        - resource
        - filter/application
        - filter/tracingsampler
        - batch/traces
      exporters:
        - kafka

  telemetry:
    logs:
      level: "info"
    metrics:
      address: 0.0.0.0:PORT

Log output

***WIP***

Additional context

We think the reason for the incorrect data is the fact that the gauge exists within OTEL's registry even after a rebalance and the metric is not receiving updates

Where this gauge is defined:

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/0d28558da65cbce23963906dbb3205fa2f383c0c/receiver/kafkareceiver/internal/metadata/generated_telemetry.go#L75-L79

One of the places where this gauge is updated:

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/0d28558da65cbce23963906dbb3205fa2f383c0c/receiver/kafkareceiver/kafka_receiver.go#L560

Where this could be addressed:

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/0d28558da65cbce23963906dbb3205fa2f383c0c/receiver/kafkareceiver/kafka_receiver.go#L529-L531

github-actions[bot] commented 3 weeks ago

Pinging code owners:

StephanSalas commented 3 weeks ago

Hi @dyanneo,

Based on my initial analysis:

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time. Cleanup(ConsumerGroupSession) error

We have to be potentially careful here with using Cleanup func to solve this issue. By the time Cleanup is called, the session's claims may no longer reflect the partitions that were assigned, or session.Claims() might be empty. We may need to check that assumption as well...

That said... I agree with your approach as long as we are guaranteed on Cleanup(), that the session is accurately populated, we can do something like this:

for topic, partitions := range session.claims() {
    for _, partition := range partitions {
        c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(
            attribute.String(attrInstanceName, c.id.Name()),
            attribute.String(attrTopic, topic),
            attribute.String(attrPartition, strconv.Itoa(int(partition))),
        ))
        // add cleanup for _offset_lag_ metric here as well.
    }
}
dyanneo commented 3 weeks ago

@StephanSalas Thanks for your inputs on this issue. I appreciate your concerns and the suggestion makes sense to me.

StephanSalas commented 3 weeks ago

According to the docs, this is the lifecycle of the serama kafka consumer framework:

// The life-cycle of a session is represented by the following steps:
//
// 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
//    and is assigned their "fair share" of partitions, aka 'claims'.
// 2. Before processing starts, the handler's Setup() hook is called to notify the user
//    of the claims and allow any necessary preparation or alteration of state.
// 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
//    in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
//    from concurrent reads/writes.
// 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
//    parent context is canceled or when a server-side rebalance cycle is initiated.
// 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
//    to allow the user to perform any final tasks before a rebalance.
// 6. Finally, marked offsets are committed one last time before claims are released.

https://github.com/IBM/sarama/blob/main/consumer_group.go#L23C1-L36C87

Thus it looks likely that we can do it in Cleanup(), except for one noteable edge case:

// Please note, that once a rebalance is triggered, sessions must be completed within
// Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
// commit failures.
// This method should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims.

TODO: We may need to somehow figure out this edge case... will look into it. Initial idea is to use the Errors() func channel listed here: https://github.com/IBM/sarama/blob/main/consumer_group.go#L87C2-L87C28