open-telemetry / opentelemetry-collector-contrib

Contrib repository for the OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
2.84k stars 2.23k forks source link

Distributing Load behind ALB/NLBs #33453

Open diranged opened 2 months ago

diranged commented 2 months ago

Component(s)

exporter/loadbalancing, exporter/prometheusremotewrite

Describe the issue you're reporting

TLDR: How do people accept ordered-writes of OTLP DataPoints behind L4/L7 Load Balancers?

We're trying to collect our metric data in both a global and local environment - the local environment can write directly to Prometheus and thus leverage the prometheusremotewritereceiver with the loadbalancing/oltpexporter exporter and a routing_key settings to send data in an ordered fashion.

However .. how do we also collect data globally when we have to run the otel-collector behind a third party L4 or L7 load balancer, and maintain ordered writes?

Full Situation

Today we pass ~200+k/sec DataPoint records from our otel-collector-agent (DaemonSet) pods by pushing them to otel-metrics-processor (StatefulSet) pods in each of our clusters. The metrics-processor pods perform filtering and then use the prometheusremotewritexporter to write the data to an AWS Managed Prometheus (AMP) endpoint "in cluster" (tied to the cluster), as well as a global secondary AMP endpoint that is in another AWS account and region.

Our otel-collector-agents flow like this:

flowchart LR
    otel-metrics-processor{{"otel-collector-metrics-processor..."}}
    loki-logs{{"central-loki distributors..."}}

    agent-target-allocator-svc{{"otel-collector-agent-targetallocator:8080"}}
    subgraph agent-target-allocator["TargetAllocator Deployment"]
      n1["...targetallocator-xxx"]
      n2["...targetallocator-xyz"]
    end
    agent-target-allocator <--> agent-target-allocator-svc
    agent-target-allocator-svc -- "GET /scrape-configs" --> prometheus-receiver

    subgraph k8s-node["Kubernetes Node"]
        subgraph g1["node processes/services"]
          kubelet("kubelet:10250/metrics")
          hostmetrics("/proc/*")
          hostfs("/hostfs")
        end
        subgraph g2["misc app pods"]
          app-pod-a("app-pod-xxx:15020/metrics")
          app-pod-b("app-pod-yyy:15020/metrics")
          app-pod-c("misc-pod-abc:9090/metrrics")
        end

        subgraph otel-agent["otel-collector-agent-xxx"]
          subgraph receivers["receivers"]
            prometheus-receiver("prometheusreceiver")
            filelog-pods-receiver{{"filelogreceiver/pods\nvar/log/pods/*/*/*.log"}}
          end

          subgraph prom-processor["prometheus processors"]
            pp1["...."] -->
            pp5["batch"] -->
            pp6["routing/aggregation"]
          end

          subgraph logs-processor["logs processors"]
            lp1["..."] -->
            lp3["transform/..."] -->
            lp4["batch"]
          end

          subgraph exporters["exporters"]
            lb-logs["oltphttp/logs"]
            lb-metrics-processor["loadbalancing/metrics"]
          end

        end

        app-pod-a   <--> prometheus-receiver
        app-pod-b   <--> prometheus-receiver
        app-pod-c   <--> prometheus-receiver
        hostfs      <--> filelog-pods-receiver

        prometheus-receiver   --> prom-processor
        prom-processor        -- ".*"       --> lb-metrics-processor

        filelog-pods-receiver --> logs-processor               --> lb-logs

        lb-logs --> loki-logs
        lb-metrics-processor --> otel-metrics-processor
    end

Once the data hits our metrics-processors, the data is duplicated to the in-cluster and remote-cluster AMP endpoints:

flowchart LR
    prometheus-amp{{"in-cluster prometheus service"}}
    oltp-central{{"central oltp service"}}
    otel-agents{{"otel-agent-collectors-..."}}

    metrics-processor-svc{{"otel-metrics-processor:4317"}}
    subgraph metrics-processor["Metrics Collection Service"]
      subgraph otel-collector["otel-collector-metrics-xxx"]
        subgraph receivers["receivers"]
          r1{{"otlp-receiver\n0.0.0.0:4317"}}
        end

        subgraph prometheus-processors["prometheus processors"]
          prod1["memory_limiter"] -->
          prod2["..."] -->
          prod10["batch/prometheus"]
        end

        subgraph exporters["exporters"]
          prometheus-exporter-amp["prometheusremotewrite/amp (local)"]
          oltp-exporter-central["loadbalancing (otlp)/central (central prom)"]
        end

        receivers             --> prometheus-processors
        prometheus-processors --> prometheus-exporter-amp
        prometheus-processors --> oltp-exporter-central

      end
    end

    otel-agents                 --> metrics-processor-svc --> metrics-processor
    prometheus-exporter-amp     --> prometheus-amp
    oltp-exporter-central       --> oltp-central

This system works as is right now... see below though for the change we want to make and why it's not working.

Working: otel-collector-agents -> Loki (native lokiexporter)

Even at high volume (~500k LogRecords/s), we can send the Loki data just fine because Loki accepts out of order writes ... so going through an NLB or an ALB to get to the Loki service works just fine. (We are currently using the lokiexporter due to https://github.com/grafana/loki/issues/13185, an unrelated issue with Loki).

Working: otel-collector-agents -> loadbalancing/oltpexporter -> metrics-processor -> prometheusremotewrite

When we use the current setup - where the metrics-processor pods do all of the PUSH /api/v1/remote_write calls - things work OK because the load balancing happens in each cluster and we can use a consistent routing key so that each metrics-processor is processing consistent resources.

The problem is that we want to move the processing to our central clusters... see below.

Not Working: otel-metrics-processors -- OTLP --> otlpexporter (grpc) --> prometheusremotewritexporter -> AMP

We would like to standardize on using the otel-collector across wide swaths of our infrastructure (not just Kubernetes) to collect metric and log data.. so naturally our first thought was that we could stop direct-writing from the metrics-processor pods to the remote prometheus endpoint, and introduce a "central otel colection" endpoint. This would give us a clean place to do metric filtering, validation and even throttling/batch-tuning.

flowchart TD
    prometheus-amp{{"in-cluster prometheus service"}}
    oltp-central{{"central oltp service"}}
    otel-agents{{"otel-agent-collectors-..."}}

    central-metrics-processor-nlb{{"central metrics-collection:4317 (L4 NLB)"}}
    subgraph central-metrics-processor["Central Metrics Collection (mTLS/gRPC)"]
      subgraph central-otel-collector["central-collector-metrics-xxx"]
        subgraph cenral-receivers["receivers"]
          r1{{"otlp-receiver\n0.0.0.0:4317"}}
        end

        subgraph central-rometheus-processors["prometheus processors"]
          cprod1["memory_limiter"] -->
          cprod2["..."] -->
          cprod10["batch/prometheus"]
        end

        subgraph central-exporters["exporters"]
          oltp-exporter-central["prometheusremotewrite/amp"]
        end

        cenral-receivers             --> central-rometheus-processors
        central-rometheus-processors --> central-exporters
      end
    end

    metrics-processor-svc{{"otel-metrics-processor:4317"}}
    subgraph metrics-processor["Metrics Collection Service"]
      subgraph otel-collector["otel-collector-metrics-xxx"]
        subgraph receivers["receivers"]
          r1{{"otlp-receiver\n0.0.0.0:4317"}}
        end

        subgraph prometheus-processors["prometheus processors"]
          prod1["memory_limiter"] -->
          prod2["..."] -->
          prod10["batch/prometheus"]
        end

        subgraph exporters["exporters"]
          prometheus-exporter-amp["prometheusremotewrite/amp (local)"]
          otlp-exporter-central["otlpexporter (grpc)"]
        end

        receivers             --> prometheus-processors
        prometheus-processors --> prometheus-exporter-amp
        prometheus-processors --> otlp-exporter-central
      end
    end

    otel-agents  --> metrics-processor-svc --> metrics-processor
    prometheus-exporter-amp  --> prometheus-amp 
    otlp-exporter-central  -- GRPC over L4 w/ mTLS   --> central-metrics-processor-nlb --> central-metrics-processor 

What happens when we do this though is that there is no real good way to distribute the load across our central-metrics-processor pods.. first we tried using Sticky sessions, but on an NLB that can only be done via the source-IP and with global NATing, that lead to extreme hot-spots. We tried without sticky sessions, and due to the requirement of ordered-writes, we saw a significant percentage of metrics dropped by AMP.

Is anyone doing this?

The question here is .. is anyone doing this at scale and with an ordered-write endpoint like Prometheus? Do we have to go as far as having two layers of OTLP collectors in our central cluster - a stateless distributed one that then collects the metrics and then handles re-routing of the datapoints with a specific routing_key to an internal statefulset that then writes to Prometheus?

Even with stickiness disabled - we still had hotspots using GRPC.. I couldn't find a good way to tell the otel-collector to create XX number of outbound GRPC pools, instead it seems like it creates one connection and just streams the data.. so that still lead to hot spots, AND out of order writes.

What are we missing?

github-actions[bot] commented 2 months ago

Pinging code owners:

jpkrohling commented 2 months ago

and due to the requirement of ordered-writes, we saw a significant percentage of metrics dropped by AMP

I'm afraid this is more about Prometheus than load balancing itself, but why are you getting those out-of-orders only when receiving data from multiple clusters? Are you having the same metric stream (metric name + attributes) coming from different clusters? Are you dropping the service.instance.id along the way?

diranged commented 2 months ago

I'm afraid this is more about Prometheus than load balancing itself, but why are you getting those out-of-orders only when receiving data from multiple clusters?

The data coming from each cluster has unique stream labels attached - so one cluster can not really impact another cluster in terms of out-of-order samples. I believe the out-of-order issue has to do with timing .. For a single given stream, imagine this scenario:

Given a single stream: my_metric{instance='1.1.1.1:9200, pod_uid='<unique uuid>'}

  1. Initial connection through LB hits metrics-ingester-0:4317 and starts accepting datapoints.
  2. Datapoints Sent: T1, T2, T3, T4
  3. Connection is interrupted because metrics-intester-0 is going to be replaced in K8S
  4. New connection through LB hits metrics-ingester-1:4317 and starts accepting datapoints...
  5. Datapoints Sent: T5, T6, T7, T8
  6. metrics-ingester-0 begins its shutdown... has a bunch of data it needs to flush..
  7. metrics-intgester-1 performs a flush of the T5-T8 samples
  8. metrics-ingester-1 tries to flush T1-T4 samples.. but hits an out-of-order error and the samples are dropped.

This is a contrived example ... but the point I am trying to demonstrate is that the timing of when the samples are flushed out to Prometheus matters. If two different OTEL collector pods end up with samples for the same stream, but they flush out of order, then you create the out-of-order error situation.

We tried using IP-based session stickiness ... but that didn't really work at all, and is problematic for a lot of reasons. Session stickiness based on some cookie would be useful, if the OTEL client supported it.

Are you having the same metric stream (metric name + attributes) coming from different clusters?

No - definitely unique streams from different clusters

Are you dropping the service.instance.id along the way?

No we are not

jpkrohling commented 1 month ago

I got it. Thank you for the example; I understand the scenario better now.

I'm even more convinced that the load balancing exporter is doing the right thing and that this should be handled at the Prometheus side of this story. In fact, this scenario should work on any Prometheus-compatible backend that supports out-of-order samples, like Mimir. They'd hold data in memory for a period of time and order them based on the timestamp before writing to disk.

I think it shouldn't be the responsibility of the load-balancer to track the timestamps for each data point in case the backend is sending data and can't ingest out-of-order samples.