open-telemetry / opentelemetry-collector-contrib

Contrib repository for the OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
3.02k stars 2.33k forks source link

[receiver/kafkareceiver] Stop Pulling New Events from EventHub Queue when Open telemetry collector get out of resources #35868

Open VijayPatil872 opened 3 days ago

VijayPatil872 commented 3 days ago

Component(s)

receiver/kafka

What happened?

Description

The Open Telemetry Pods pull events from the Azure EventHub Queue and export events to the backends. If these backends are not available to accept data, then open telemetry collector keeps pulling events from Azure EventHub with Kafka receiver and otel pods keep these events in its in-memory queue, which gradually fills up all the memory capacity of the otel pods. hence otel pods gets out of resources and then starts dropping the events. For this reason, we can't make right use of Kafka/EventHub as a data loss protection queue. To avoid the issue of stop pulling more events from the Azure EventHub queue when in-memory queue or otel resources get insufficient, is there any components will help in theses situation, or we can have any solution on this issue will be helpful.

Steps to Reproduce

Expected Result

Open telemetry collector should stop Pulling New Events from EventHub Queue when Open telemetry collector get out of resources

Actual Result

Otel pods keeps pulling events from the Azure EventHub Queue though Open telemetry collector get out of resources

Collector version

0.104

Environment information

Environment

OS: (e.g., "Ubuntu 20.04") Compiler(if manually compiled): (e.g., "go 14.2")

OpenTelemetry Collector configuration

mode: "deployment"
namespaceOverride: 
config:         
  exporters:
    debug:
      verbosity: basic

    kafka/logs:
      brokers:
        - **************:9093
      topic:
      protocol_version: 3.7.0
      encoding: otlp_proto
      producer:
        max_message_bytes: 1000000
        compression: gzip
      sending_queue:
        enabled: true
        queue_size: 20000
      auth:
        tls:
          insecure: true
        sasl:
          username: "$$**********"
          password: ${******************}
          mechanism: PLAIN

    kafka/metrics:
      brokers:
        - *****************:9093
      topic: 
      protocol_version: 3.7.0
      encoding: otlp_proto
      sending_queue:
        enabled: true
        queue_size: 20000
      auth:
        tls:
          insecure: true
        sasl:
          username: "$$*****************"
          password: ${*****************************}
          mechanism: PLAIN
      producer:
        max_message_bytes: 1000000
        compression: gzip

    kafka/traces:
      brokers:
        - ********************************:9093
      topic: 
      protocol_version: 3.7.0
      encoding: otlp_proto
      sending_queue:
        enabled: true
        queue_size: 20000
      auth:
        tls:
          insecure: false
        sasl:
          username: "$$****************"
          password: ${***************************}
          mechanism: PLAIN
      producer:
        max_message_bytes: 1000000
        compression: gzip

  extensions:
    health_check:
      endpoint: ${env:MY_POD_IP}:**********

  processors:
    attributes/http-client-ip:
      actions:
        - action: upsert
          from_context: 
          key: 

    batch: {}

    batch/event-hub:
      send_batch_max_size: 110
      send_batch_size: 100

    batch/traces:
      send_batch_max_size: 1200
      send_batch_size: 1000
      timeout: 5s

    memory_limiter:
      check_interval: 5s
      limit_percentage: 80
      spike_limit_percentage: 25

  receivers:
    otlp:
      protocols:
        http:
          cors:
            allowed_origins:
            - http:****************
            - https:****************
          endpoint: ${env:MY_POD_IP}:******
          include_metadata: true
          max_request_body_size: 10485760

  service:
    extensions:
      - health_check
    pipelines:
      kafka/logs:
        exporters:
        - kafka/logs
        processors:
        #- memory_limiter
        - batch/event-hub
        receivers:
        - otlp

      kafka/metrics:
        exporters:
        - kafka/metrics
        processors:
        #- memory_limiter
        - batch/event-hub
        receivers:
        - otlp

      kafka/traces:
        exporters:
        - kafka/traces
        processors:
        #- memory_limiter
        - batch/event-hub
        receivers:
        - otlp

    telemetry:
      metrics:
        address: ${env:MY_POD_IP}:**********

Log output

No response

Additional context

No response

github-actions[bot] commented 3 days ago

Pinging code owners:

atoulme commented 1 day ago

You need a way to apply backpressure. The memory_limiter processor is in charge of checking memory usage and applying such backpressure. I see it is disabled in your pipeline.

VijayPatil872 commented 2 hours ago

@atoulme As mentioned to make use of the the memory_limiter processor in the pipelines, we tested this scenario. It is observed that with enabling memory_limiter processor, when backend is down then exporters starts dropping data. after sometime the collector pod memory is vacated and then it starts pulling the events from the queue. this is not fulfilling the expectations on memory limiter. Do you think another approach can be tried here.

09_32_54

As seen in the screenshot for sometime the acceptance rate goes down, but then it again starts to pull metrics. Also we observed the pull rate of metrics from kafka is way more than ingested rate.