open-telemetry / opentelemetry-collector-contrib

Contrib repository for the OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
2.76k stars 2.19k forks source link

Ingestion of metrics or logs very slow (and high CPU usage on collector) #33427

Open rpasche opened 1 month ago

rpasche commented 1 month ago

Component(s)

cmd/telemetrygen

Describe the issue you're reporting

Hi,

I am running the otel collector-contrib version 0.100.0 with this configuration

exporters:
  otlp/apm:
    endpoint: server:1234
    headers:
      Authorization: Bearer ${env:SECRET_TOKEN}
    tls:
      ca_file: /certs/ca.pem
      cert_file: /certs/tls.crt
      insecure_skip_verify: true
      key_file: /certs/tls.key
extensions:
  basicauth/server:
    htpasswd:
      file: /basicauth/.htpasswd
  health_check: {}
  zpages: {}
processors:
  attributes/tenantid:
    actions:
    - action: upsert
      key: tenant.id
      value: ${env:TENANT_ID}
  batch: {}
  memory_limiter:
    check_interval: 5s
    limit_percentage: 80
    spike_limit_percentage: 19
receivers:
  otlp:
    protocols:
      grpc:
        auth:
          authenticator: basicauth/server
        endpoint: ${MY_POD_IP}:4317
        tls:
          ca_file: /certs/ca.pem
          cert_file: /certs/tls.crt
          key_file: /certs/tls.key
      http:
        auth:
          authenticator: basicauth/server
        endpoint: ${MY_POD_IP}:4318
        tls:
          ca_file: /certs/ca.pem
          cert_file: /certs/tls.crt
          key_file: /certs/tls.key
service:
  extensions:
  - health_check
  - zpages
  - basicauth/server
  pipelines:
    logs:
      exporters:
      - otlp/apm
      processors:
      - memory_limiter
      - attributes/tenantid
      - batch
      receivers:
      - otlp
      - syslog
      - tcplog
    metrics:
      exporters:
      - otlp/apm
      processors:
      - memory_limiter
      - attributes/tenantid
      - batch
      receivers:
      - otlp
    traces:
      exporters:
      - otlp/apm
      processors:
      - memory_limiter
      - attributes/tenantid
      - batch
      receivers:
      - otlp
  telemetry:
    logs:
      encoding: json
      level: debug
    metrics:
      address: 0.0.0.0:8888
      level: detailed

I'm running this collector in a k8s POD (one container only) and the collector has 2 CPU and 4 GB memory attached.

When running the telemetry-gen command to send 1000 traces, this finishes within 1 second.

If I run - basically - the same command to send either 1000 logs or metrics, this takes ~ 2 - 3 minutes to complete. Looking into Grafana and the collector dashboard only ~ 20 metrics / s or only ~ 8 logs / s are received on the collector.

Additionally, the CPU load when sending metrics or logs goes very high. It is around 60%.

Compared to this, running a trace send for 5m with 1 worker, the CPU usage on the collector also goes to ~ 60 %, but it's processing ~ 4k spans/s.

All time, there is no queuing shown within the collector.

Example commands I ran

~/go/bin/telemetrygen traces --otlp-endpoint otlp-endpoint:4317 --otlp-header Authorization=\"Basic\ base64...\" --service=my-custom-service --duration=5m --rate=0

Similar commands were used to send 1000 metrics or logs (--metrics 1000 or --logs 1000 without setting the --duration)

See some screenshots while trying to send 1000 logs to the collector

Receiver: image

Processor: image

Exporters: image

Collector stats: image

Any hint what is going on here or what I am doing wrong?

Thanks

github-actions[bot] commented 1 month ago

Pinging code owners:

cforce commented 1 month ago

@rpasche I am not sure if component "cmd/telemetrygen" and the code owners can help here. I i understand it correctly you are complaining about high cpu on the collector side for log and metrics and not at the load generator "cmd/telemetrygen". From the graphs its look like the log load is adding the cpu, so you shall maybe switch on collector profiling and check in which go modules, packages, methods the cpu is mostly located. I don't see very specific otel modules, so you expectation might be the otel core shall have less cpu fooprint

rpasche commented 1 month ago

Hi @cforce , I activated the pprof extension and noticed, that the collector spended most time on the decrypt function of bcrypt. So I disabled authentication while testing, but still, sending traces worked as before, while seding logs or metrics is still pretty slow.

I think I know notice, what is causing this. In metrics and logs, every single metric or log is sent on its own. There is no batch or metrics, even if --rate=0 is set. So in this case, most likely latency is an issue and this (with one worker) currently results in ~ 20 metrics or logs / second.

rpasche commented 1 month ago

This is a poor adjustment of the code, but it - basically - works to now also send logs and traces in batches of 1000 signals.

--- a/cmd/telemetrygen/internal/metrics/worker.go
+++ b/cmd/telemetrygen/internal/metrics/worker.go
@@ -46,44 +46,62 @@ func (w worker) simulateMetrics(res *resource.Resource, exporterFunc func() (sdk
        }
    }()

-   var i int64
+   var i, batch int64
+
+   const BATCHSIZE = 1000
+
    for w.running.Load() {
        var metrics []metricdata.Metrics
-
-       switch w.metricType {
-       case metricTypeGauge:
-           metrics = append(metrics, metricdata.Metrics{
-               Name: w.metricName,
-               Data: metricdata.Gauge[int64]{
-                   DataPoints: []metricdata.DataPoint[int64]{
-                       {
-                           Time:       time.Now(),
-                           Value:      i,
-                           Attributes: attribute.NewSet(signalAttrs...),
-                           Exemplars:  w.exemplars,
+       batch = 0
+       for {
+           switch w.metricType {
+           case metricTypeGauge:
+               metrics = append(metrics, metricdata.Metrics{
+                   Name: w.metricName,
+                   Data: metricdata.Gauge[int64]{
+                       DataPoints: []metricdata.DataPoint[int64]{
+                           {
+                               Time:       time.Now(),
+                               Value:      i,
+                               Attributes: attribute.NewSet(signalAttrs...),
+                               Exemplars:  w.exemplars,
+                           },
                        },
                    },
-               },
-           })
-       case metricTypeSum:
-           metrics = append(metrics, metricdata.Metrics{
-               Name: w.metricName,
-               Data: metricdata.Sum[int64]{
-                   IsMonotonic: true,
-                   Temporality: metricdata.CumulativeTemporality,
-                   DataPoints: []metricdata.DataPoint[int64]{
-                       {
-                           StartTime:  time.Now().Add(-1 * time.Second),
-                           Time:       time.Now(),
-                           Value:      i,
-                           Attributes: attribute.NewSet(signalAttrs...),
-                           Exemplars:  w.exemplars,
+               })
+           case metricTypeSum:
+               metrics = append(metrics, metricdata.Metrics{
+                   Name: w.metricName,
+                   Data: metricdata.Sum[int64]{
+                       IsMonotonic: true,
+                       Temporality: metricdata.CumulativeTemporality,
+                       DataPoints: []metricdata.DataPoint[int64]{
+                           {
+                               StartTime:  time.Now().Add(-1 * time.Second),
+                               Time:       time.Now(),
+                               Value:      i,
+                               Attributes: attribute.NewSet(signalAttrs...),
+                               Exemplars:  w.exemplars,
+                           },
                        },
                    },
-               },
-           })
-       default:
-           w.logger.Fatal("unknown metric type")
+               })
+           default:
+               w.logger.Fatal("unknown metric type")
+           }
+
+           i++
+           batch++
+           if err := limiter.Wait(context.Background()); err != nil {
+               w.logger.Fatal("limiter wait failed, retry", zap.Error(err))
+           }
+           if w.numMetrics != 0 && i >= int64(w.numMetrics) {
+               break
+           }
+           // break if BATCHSIZE is reached
+           if batch >= BATCHSIZE {
+               break
+           }
        }

        rm := metricdata.ResourceMetrics{
@@ -94,11 +112,6 @@ func (w worker) simulateMetrics(res *resource.Resource, exporterFunc func() (sdk
        if err := exporter.Export(context.Background(), &rm); err != nil {
            w.logger.Fatal("exporter failed", zap.Error(err))
        }
-       if err := limiter.Wait(context.Background()); err != nil {
-           w.logger.Fatal("limiter wait failed, retry", zap.Error(err))
-       }
-
-       i++
        if w.numMetrics != 0 && i >= int64(w.numMetrics) {
            break
        }

After building, running

./telemetrygen logs --otlp-endpoint otlp-grpc-endpoint:4317 --otlp-header Authorization=\"Basic\ XXXXX\" --duration=5m --rate=0

looks like this

Receiver + Processor

image

Exporter

image

In that 5 minutes, that one worker sent 5572000 logs

...
2024-06-12T09:02:07.276+0200    INFO    logs/worker.go:107  logs generated  {"worker": 0, "logs": 5572000}

image