open-telemetry / opentelemetry-operator

Kubernetes Operator for OpenTelemetry Collector
Apache License 2.0
1.23k stars 442 forks source link

An algorithm for new target allocation, based on the average job algorithm. #3128

Open zh-w opened 4 months ago

zh-w commented 4 months ago

Component(s)

target allocator

Is your feature request related to a problem? Please describe.

In the scenario where I use target-allocator, there are usually different jobs, and the metric datapoints of targets for each job vary significantly. For example, there are five types of collection jobs: A, B, C, D, and E. Suppose each job has the same collection interval and each job has 10 targets. The number of datapoints pulled by each target of job A is 1000 (e.g., KSM), for job B it is 100, for jobs C and D it is 50, and for job E it is 10.

At the same time, assume I have 5 collector instances deployed in StatefulSets. When using consistent-hashing or least-weighted algorithms, the targets for each job are not evenly distributed across each collector instance. In the assumed collection scenario, it is possible that collector-0 is assigned 3 targets of job A, while collector-4 is assigned 0 targets of job A. This can result in a significant disparity in the number of datapoints collected by each collector, leading to an imbalance in load.

In my actual use case, this situation occurs quite frequently. Below is a diagram showing the load distribution of collectors in a large cluster I deployed (using the consistent-hashing algorithm), illustrating the extreme imbalance in resource utilization across each collector. image

Describe the solution you'd like

I have implemented a load-balancing algorithm based on jobs. The algorithm is designed as follows:

Describe alternatives you've considered

No response

Additional context

No response

zh-w commented 4 months ago

The algorithm is designed as follows:

if service discovery targets change {
    for each deleted target:
        remove the corresponding target from the original collector without triggering rebalancing
    for each new target:
        for the job's targets, find the collector with the smallest number of assigned targets for that job. If multiple collectors meet the criteria, choose the collector with the smallest total number of targets.
} else if the list of collector instances changes {
    for each new collector:
        add the collector to the list and reassign all targets
    for each removed collector:
        remove the collector from the list and reassign all targets
}
jaronoff97 commented 4 months ago

@zh-w what do the metrics from the target allocator look like for the targets per collector metric? It's possible that the distribution is even, but the memory usage is not in the scenario where some of your scrape targets emit more metrics than others.

swiatekm commented 4 months ago

I wouldn't really mind if we just made the least-weighted strategy behave this way. @zh-w would that work? Out of the set of collectors with the least amount of targets, we'd pick the one with the least amount of targets from the assigned target's job. Right now we just pick randomly.

Both least-weighted and your proposal have the undesirable property that they're not stable - the actual assignment can depend on the order of target additions and deletions, whereas consistent-hashing and per-node always give the same result. This makes the implementation of the former more complex and requires doing things we'd prefer not to do, like hashing all the target label sets to determine their identities.

philchia commented 4 months ago

I wouldn't really mind if we just made the least-weighted strategy behave this way. @zh-w would that work? Out of the set of collectors with the least amount of targets, we'd pick the one with the least amount of targets from the assigned target's job. Right now we just pick randomly.

Both least-weighted and your proposal have the undesirable property that they're not stable - the actual assignment can depend on the order of target additions and deletions, whereas consistent-hashing and least-weighted always give the same result. This makes the implementation of the former more complex and requires doing things we'd prefer not to do, like hashing all the target label sets to determine their identities.

least-weighted is not stable too, it depend on the order of target additions and deletions.

swiatekm commented 3 months ago

I wouldn't really mind if we just made the least-weighted strategy behave this way. @zh-w would that work? Out of the set of collectors with the least amount of targets, we'd pick the one with the least amount of targets from the assigned target's job. Right now we just pick randomly. Both least-weighted and your proposal have the undesirable property that they're not stable - the actual assignment can depend on the order of target additions and deletions, whereas consistent-hashing and least-weighted always give the same result. This makes the implementation of the former more complex and requires doing things we'd prefer not to do, like hashing all the target label sets to determine their identities.

least-weighted is not stable too, it depend on the order of target additions and deletions.

My bad, I meant per-node there.

zh-w commented 3 months ago

@zh-w what do the metrics from the target allocator look like for the targets per collector metric? It's possible that the distribution is even, but the memory usage is not in the scenario where some of your scrape targets emit more metrics than others.

I retested in the my k8s cluster, which consists of about 1000 nodes. The configuration of my collector cr is as follows(Simplified):

exporters:
      prometheusremotewrite:xxx
    extensions:
      health_check: {}
      pprof:
        endpoint: :1888
      promhttp: {}
      zpages:xxx
    processors:
      attributes/insertpod:xxx
      batch: {}
      stats: {}
      groupbyattrs/pod:xxx
      k8sattributes:xxx
      memory_limiter:xxx
      resource:
        attributes:
        - action: insert
          from_attribute: k8s.pod.name
          key: pod
        - action: insert
          from_attribute: k8s.namespace.name
          key: namespace
        - action: insert
          from_attribute: k8s_node_name
          key: k8s.node.name
      resourcedetection:
        detectors:
        - env
        override: false
        timeout: 10s
      prometheus:
        config:
          scrape_configs:
          - job_name: cadvisor xxx
          - job_name: coredns xxx
          - job_name: etcd-cluster xxx
          - job_name: etcd-events xxx
          - job_name: kube-apiserver xxx
          - job_name: kube-scheduler xxx
          - job_name: kubelet xxx
          - job_name: kubelet-resource xxx
          - job_name: node-exporter xxx
          - job_name: ingress-nginx xxx
            scrape_interval: 30s
        target_allocator:
          collector_id: ${POD_NAME}
          endpoint: http://otel-targetallocator
          interval: 30s
      prometheus/internal:
        config:
          scrape_configs:
          - job_name: opentelemetry-collector
            scrape_interval: 10s
            static_configs:
            - targets:
              - ${K8S_POD_IP}:8888
              labels:
                collector_name: '${POD_NAME}'
          - job_name: opentelemetry-target-allocator
            scrape_interval: 30s
            static_configs:
            - targets:
              - otel-targetallocator:80
      zipkin:
        endpoint: 0.0.0.0:9411
    service:
      extensions:
      - health_check
      - pprof
      - zpages
      - promhttp
      - agileauth/vmp
      pipelines:
        metrics/resource:
          exporters:
          - prometheusremotewrite
          processors:
          - memory_limiter
          - batch
          - stats
          - resourcedetection
          - attributes/insertpod
          - groupbyattrs/pod
          - resource
          receivers:
          - prometheus
          - prometheus/internal

When I use the consistent-hashing algorithm, the number of targets for the collector is as follows: image

and the datapoints received by collector distributed as follows(The statistical data was collected by a processor ‘stats’ I implemented): image

When I use the new job-average algorithm, the distribution of target numbers for the collector is as follows: image

the datapoints received by collector distributed as follows: image

zh-w commented 3 months ago

I use metric "opentelemetry_allocator_targets_per_collector" from target-allocator to count the number of targets for the collector,and I added a “job_name” label to this metric for more accurately tally the target number of different jobs。

zh-w commented 3 months ago

The ‘stats’ processor in collector implemented as follows:

func (p *processorImp) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
    rms := md.ResourceMetrics()
    md.DataPointCount()
    for i := 0; i < rms.Len(); i++ {
        metricCount := 0
        metricDpCount := 0
        rm := rms.At(i)
        serviceName, haveServiceName := rm.Resource().Attributes().Get(conventions.AttributeServiceName)
        ilms := rm.ScopeMetrics()
        for j := 0; j < ilms.Len(); j++ {
            ilm := ilms.At(j)
            metricCount += ilm.Metrics().Len()
            ms := ilm.Metrics()
            for k := 0; k < ms.Len(); k++ {
                m := ms.At(k)
                switch m.Type() {
                case pmetric.MetricTypeGauge:
                    metricDpCount += m.Gauge().DataPoints().Len()
                case pmetric.MetricTypeSum:
                    metricDpCount += m.Sum().DataPoints().Len()
                case pmetric.MetricTypeHistogram:
                    metricDpCount += m.Histogram().DataPoints().Len()
                case pmetric.MetricTypeExponentialHistogram:
                    metricDpCount += m.ExponentialHistogram().DataPoints().Len()
                case pmetric.MetricTypeSummary:
                    metricDpCount += m.Summary().DataPoints().Len()
                }
            }
        }
        if haveServiceName {
            p.telemetry.recordMetricReceived(ctx, metricCount, serviceName.AsString())
            p.telemetry.recordMetricDpReceived(ctx, metricDpCount, serviceName.AsString())
        } else {
            p.telemetry.recordMetricReceived(ctx, metricCount, "unkown-job")
            p.telemetry.recordMetricDpReceived(ctx, metricDpCount, "unkown-job")
        }
    }
    return md, nil
}

Exposed metric of 'stats' processor is as follews:

// labels
processorTagKey      = tag.MustNewKey("processor")
jobTagKey            = tag.MustNewKey("job_name")
// metric
statMetricDpReceived = stats.Int64("metric_datapoints_received", "Counter of metric datapoints received", stats.UnitDimensionless)
zh-w commented 3 months ago

I wouldn't really mind if we just made the least-weighted strategy behave this way. @zh-w would that work? Out of the set of collectors with the least amount of targets, we'd pick the one with the least amount of targets from the assigned target's job. Right now we just pick randomly.

Both least-weighted and your proposal have the undesirable property that they're not stable - the actual assignment can depend on the order of target additions and deletions, whereas consistent-hashing and per-node always give the same result. This makes the implementation of the former more complex and requires doing things we'd prefer not to do, like hashing all the target label sets to determine their identities.

I think there are different use situations。One kind of situation is that targets are relatively stable:for example,collector is deployed to collect stable jobs ,like cadvior、kube-state-metrics、node-exporter etc,targets of those jobs usually related to number of node。In this situation,which target would not frequently adding or deleting,this new algorithm works well。 As for algorithm stability, I think it is hard to balance the effectiveness of load balancing and the stability of algorithms。Maybe effectiveness of load balancing is more important ,which we need to guarantee。

zh-w commented 3 months ago

@zh-w what do the metrics from the target allocator look like for the targets per collector metric? It's possible that the distribution is even, but the memory usage is not in the scenario where some of your scrape targets emit more metrics than others.

I retested in the my k8s cluster, which consists of about 1000 nodes. The configuration of my collector cr is as follows(Simplified):


exporters:
      prometheusremotewrite:xxx
    extensions:
      health_check: {}
      pprof:
        endpoint: :1888
      promhttp: {}
      zpages:xxx
    processors:
      attributes/insertpod:xxx
      batch: {}
      stats: {}
      groupbyattrs/pod:xxx
      k8sattributes:xxx
      memory_limiter:xxx
      resource:
        attributes:
        - action: insert
          from_attribute: k8s.pod.name
          key: pod
        - action: insert
          from_attribute: k8s.namespace.name
          key: namespace
        - action: insert
          from_attribute: k8s_node_name
          key: k8s.node.name
      resourcedetection:
        detectors:
        - env
        override: false
        timeout: 10s
      prometheus:
        config:
          scrape_configs:
          - job_name: cadvisor xxx
          - job_name: coredns xxx
          - job_name: etcd-cluster xxx
          - job_name: etcd-events xxx
          - job_name: kube-apiserver xxx
          - job_name: kube-scheduler xxx
          - job_name: kubelet xxx
          - job_name: kubelet-resource xxx
          - job_name: node-exporter xxx
          - job_name: ingress-nginx xxx
            scrape_interval: 30s
        target_allocator:
          collector_id: ${POD_NAME}
          endpoint: http://otel-targetallocator
          interval: 30s
      prometheus/internal:
        config:
          scrape_configs:
          - job_name: opentelemetry-collector
            scrape_interval: 10s
            static_configs:
            - targets:
              - ${K8S_POD_IP}:8888
              labels:
                collector_name: '${POD_NAME}'
          - job_name: opentelemetry-target-allocator
            scrape_interval: 30s
            static_configs:
            - targets:
              - otel-targetallocator:80
      zipkin:
        endpoint: 0.0.0.0:9411
    service:
      extensions:
      - health_check
      - pprof
      - zpages
      - promhttp
      - agileauth/vmp
      pipelines:
        metrics/resource:
          exporters:
          - prometheusremotewrite
          processors:
          - memory_limiter
          - batch
          - stats
          - resourcedetection
          - attributes/insertpod
          - groupbyattrs/pod
          - resource
          receivers:

Use

I wouldn't really mind if we just made the least-weighted strategy behave this way. @zh-w would that work? Out of the set of collectors with the least amount of targets, we'd pick the one with the least amount of targets from the assigned target's job. Right now we just pick randomly.

Both least-weighted and your proposal have the undesirable property that they're not stable - the actual assignment can depend on the order of target additions and deletions, whereas consistent-hashing and per-node always give the same result. This makes the implementation of the former more complex and requires doing things we'd prefer not to do, like hashing all the target label sets to determine their identities.

I tested least-weighted algorithm @swiatekm , the number of targets for the collector is as follows:

image

the datapoints received by collector distributed as follows:

image

It seems that a small number of collectors got just one target.