grafana / alloy

OpenTelemetry Collector distribution with programmable pipelines
https://grafana.com/oss/alloy
Apache License 2.0
1.42k stars 210 forks source link

Various "error":"proto: wrong wireType = 2 for field TimeUnixNano" messages when using otel.receiver.kafka #644

Closed elburnetto-intapp closed 6 months ago

elburnetto-intapp commented 6 months ago

What's wrong?

Our architecture is currently:

Applications -> OTel Collector (otel/opentelemetry-collector-contrib:0.98.0) -> Kafka -> Grafana Alloy (1.0.0) -> Grafana Tempo (2.4.0).

OTel Collector receives the traces from our apps, and exports them onto a Kafka topic, using the otlp_proto encoding. We've then setup the otel.receiver.kafka component in Alloy to consume from this Kafka topic, to then send onto Tempo via gRPC (as we're looking to enable Tempo Multi-tenancy, which isn't supported by Kafka).

When Alloy starts consuming these messages, our Alloy logs start throwing various errors around wrong wireTypes, illegal wireTypes etc, which make no sense (previously when Kafka was being consumed by Tempo directly, we saw none of these errors).

Steps to reproduce

  1. Run a simple pipeline in OTel Collector on 0.98.0, exporting to a Kafka topic using the otlp_proto encoding format.
  2. Run a simple Alloy pipeline to consume from Kafka, batch process then output via gRPC to Tempo
  3. Watch the errors come in.

System information

No response

Software version

No response

Configuration

ALLOY:

otelcol.receiver.kafka "otelkafka" {
  brokers          = ["kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9093"]
  protocol_version = "2.8.0"
  topic = "otlp-tracing"
  group_id = "alloy-otelkafka"
  client_id = "alloy-otelkafka"
  encoding = "otlp_proto"

  authentication {
    tls {
        ca_file = "/tmp/ca.crt"
        insecure = true
    }
  }

  message_marking {
    after_execution = true
    include_unsuccessful = true
  }

  output {
    metrics = [otelcol.processor.batch.otelkafka.input]
    logs    = [otelcol.processor.batch.otelkafka.input]
    traces  = [otelcol.processor.batch.otelkafka.input]
  }
}

otelcol.processor.batch "otelkafka" {
    send_batch_size = 10
    timeout = "100ms"
    output {
        metrics = [otelcol.exporter.otlp.otelkafka.input]
        logs    = [otelcol.exporter.otlp.otelkafka.input]
        traces  = [otelcol.exporter.otlp.otelkafka.input]
    }
}

otelcol.exporter.otlp "otelkafka" {
  client {
    endpoint = "grafana-tempo-distributed-distributor-discovery.monitoring.svc.cluster.local:4317"
    tls {
        insecure = true
        insecure_skip_verify = true
    }
  }
}

Logs

{"ts":"2024-04-23T09:17:23.578960444Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.otelkafka","error":"proto: illegal wireType 7"}
{"ts":"2024-04-23T09:17:23.582466437Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.otelkafka","error":"proto: wrong wireType = 2 for field TimeUnixNano"}
{"ts":"2024-04-23T09:17:32.771035129Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.otelkafka","error":"proto: Gauge: wiretype end group for non-group"}
{"ts":"2024-04-23T09:17:32.775138857Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.otelkafka","error":"proto: wrong wireType = 2 for field TimeUnixNano"}
wildum commented 6 months ago

Hello, this seems like a mismatched between encoding and decoding between the Otel collector and Alloy. Alloy v1.0.0 is based on the version 0.96.0 of the Otel collector. I see that version 0.96.0 is using the pdata@1.3 whereas the version 0.98.0 is using pdata@1.5 (see https://github.com/open-telemetry/opentelemetry-collector/blob/main/pdata/README.md). Can you try with the otel collector v0.96.0 and see if the same errors occur? FYI since 1.0.0 we committed to update our Opentelemetry dependency before every release (we release a new minor version of Alloy every 6 weeks). Next release is planned for the 7th of May and should contain the a new Otel version (likely v0.99)

elburnetto-intapp commented 6 months ago

Hey @wildum, this is still happening on 0.96.0 of the Otel Collector too (if anything, the amount of errors in the Alloy logs have increased:

{"ts":"2024-04-25T10:07:32.150095522Z","level":"info","msg":"Starting consumer group","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","partition":2}
{"ts":"2024-04-25T10:07:32.258341425Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","error":"proto: illegal wireType 7"}
{"ts":"2024-04-25T10:07:32.297896393Z","level":"info","msg":"Starting consumer group","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","partition":2}
{"ts":"2024-04-25T10:07:32.406112362Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","error":"proto: illegal wireType 7"}
{"ts":"2024-04-25T10:07:32.421765033Z","level":"info","msg":"Starting consumer group","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","partition":2}
{"ts":"2024-04-25T10:07:32.530223899Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","error":"proto: illegal wireType 7"}
{"ts":"2024-04-25T10:07:32.544921931Z","level":"info","msg":"Starting consumer group","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","partition":2}
{"ts":"2024-04-25T10:07:32.653745757Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","error":"proto: illegal wireType 7"}
{"ts":"2024-04-25T10:07:32.668501721Z","level":"info","msg":"Starting consumer group","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","partition":2}
{"ts":"2024-04-25T10:07:32.777282702Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","error":"proto: illegal wireType 7"}
{"ts":"2024-04-25T10:07:32.792960196Z","level":"info","msg":"Starting consumer group","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","partition":2}
{"ts":"2024-04-25T10:07:32.901749908Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","error":"proto: illegal wireType 7"}
{"ts":"2024-04-25T10:07:32.92041645Z","level":"info","msg":"Starting consumer group","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","partition":2}
{"ts":"2024-04-25T10:07:33.028939054Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","error":"proto: illegal wireType 7"}
{"ts":"2024-04-25T10:07:33.045607833Z","level":"info","msg":"Starting consumer group","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","partition":2}
{"ts":"2024-04-25T10:07:33.153926792Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","error":"proto: illegal wireType 7"}
{"ts":"2024-04-25T10:07:33.170913078Z","level":"info","msg":"Starting consumer group","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","partition":2}
{"ts":"2024-04-25T10:07:33.279599572Z","level":"error","msg":"failed to unmarshal message","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","error":"proto: illegal wireType 7"}
{"ts":"2024-04-25T10:07:33.303345037Z","level":"info","msg":"Starting consumer group","component_path":"/","component_id":"otelcol.receiver.kafka.intapp_otel","partition":2}
wildum commented 6 months ago

Could you also share your Otel collector config please?

elburnetto-intapp commented 6 months ago

Sure thing.

    connectors:
      count/spans:
        spans:
          spans.count:
            attributes:
            - key: service.name
            description: Number of spans per service.
      spanmetrics:
        aggregation_temporality: AGGREGATION_TEMPORALITY_CUMULATIVE
        dimensions_cache_size: 1000
        exclude_dimensions:
        - status.code
        exemplars:
          enabled: true
        histogram:
          explicit:
            buckets:
            - 1ms
            - 10ms
            - 100ms
            - 250ms
            - 1s
            - 5s
          unit: ms
        metrics_flush_interval: 15s
    exporters:
      kafka/traces:
        auth:
          <Redacted>
        brokers: kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9093
        encoding: otlp_proto
        protocol_version: 2.8.0
        retry_on_failure:
          enabled: true
          max_elapsed_time: 600s
          max_interval: 60s
        topic: otlp-tracing
      otlp/force-nginx-pipeline:
        endpoint: localhost:24318
        tls:
          insecure: true
      otlp/force-probability-pipeline:
        endpoint: localhost:24317
        tls:
          insecure: true
      otlp/standard:
        endpoint: localhost:14317
        tls:
          insecure: true
      prometheus:
        add_metric_suffixes: false
        enable_open_metrics: true
        endpoint: 0.0.0.0:8889
        metric_expiration: 180m
        resource_to_telemetry_conversion:
          enabled: true
        send_timestamps: true
    extensions:
      health_check: {}
      memory_ballast: {}
    processors:
      attributes/add-nginx-tag:
        actions:
        - action: upsert
          key: pipeline
          value: force-nginx-passed
      attributes/add-passed-tag:
        actions:
        - action: upsert
          key: pipeline
          value: force-probability-passed
      attributes/traces-probalistic-by-servce:
        actions:
        - action: upsert
          key: pipeline
          value: force-probability
        exclude:
          match_type: strict
          services:
          - thanos-frontend
          - thanos-query
      attributes/traces-probalistic-nginx:
        actions:
        - action: upsert
          key: pipeline
          value: force-nginx
        include:
          match_type: strict
          services:
          - nginx
      attributes/traces-standard:
        actions:
        - action: insert
          key: pipeline
          value: standard
      batch:
        send_batch_size: 100000
        timeout: 5s
      filter/metrics:
        metrics:
          include:
            expressions:
            - Label("service.name") == "thanos-frontend"
            - Label("service.name") == "thanos-query"
            match_type: expr
      filter/spans:
        spans:
          exclude:
            match_type: regexp
            span_names:
            - /prometheus
            - /metrics
            - .*/healthz
      groupbyattrs/traces:
        keys:
        - pipeline
      k8sattributes:
        auth_type: kubeConfig
        extract:
          metadata:
          - k8s.pod.name
          - k8s.deployment.name
          - k8s.namespace.name
          - k8s.node.name
        passthrough: false
      memory_limiter:
        check_interval: 5s
        limit_mib: 1638
        spike_limit_mib: 512
      probabilistic_sampler:
        hash_seed: 223
        sampling_percentage: 10
      resource:
        attributes:
        - action: delete
          key: cloud.availability_zone
        - action: delete
          key: cloud.platform
        - action: delete
          key: azure.vm.scaleset.name
        - action: delete
          key: host.arch
        - action: delete
          key: container
        - action: delete
          key: container.id
        - action: delete
          key: host.id
        - action: delete
          key: host.image.id
        - action: delete
          key: host.type
        - action: delete
          key: os.description
        - action: delete
          key: os.type
        - action: delete
          key: process_command_args
      resourcedetection:
        detectors:
        - ec2
        - azure
        override: false
      routing/traces:
        attribute_source: resource
        default_exporters:
        - otlp/standard
        from_attribute: pipeline
        table:
        - exporters:
          - otlp/standard
          value: standard
        - exporters:
          - otlp/force-probability-pipeline
          value: force-probability
        - exporters:
          - otlp/force-nginx-pipeline
          value: force-nginx
      tail_sampling:
        policies:
        - and:
            and_sub_policy:
            - name: services-using-tail_sampling-policy
              string_attribute:
                invert_match: true
                key: service.name
                values:
                - nginx-internal
              type: string_attribute
            - name: sample-all-policy
              type: always_sample
          name: backwards-compatibility-policy
          type: and
        - and:
            and_sub_policy:
            - name: service-name-policy
              string_attribute:
                key: service.name
                values:
                - nginx-internal
              type: string_attribute
            - name: rate-limiting-policy
              rate_limiting:
                spans_per_second: 2
              type: rate_limiting
            - name: probabilistic-policy
              probabilistic:
                sampling_percentage: 7
              type: probabilistic
          name: nginx-rules
          type: and
    receivers:
      jaeger:
        protocols:
          grpc:
            endpoint: 0.0.0.0:14250
          thrift_http:
            endpoint: 0.0.0.0:14268
      otlp:
        protocols:
          grpc:
            endpoint: 0.0.0.0:4317
          http:
            endpoint: 0.0.0.0:4318
      otlp/force-nginx-pipeline:
        protocols:
          grpc:
            endpoint: localhost:24318
      otlp/force-probability-pipeline:
        protocols:
          grpc:
            endpoint: localhost:24317
      otlp/standard:
        protocols:
          grpc:
            endpoint: localhost:14317
      zipkin:
        endpoint: 0.0.0.0:9411
    service:
      extensions:
      - health_check
      pipelines:
        metrics:
          exporters:
          - prometheus
          processors:
          - filter/metrics
          receivers:
          - spanmetrics
        metrics/count:
          exporters:
          - prometheus
          receivers:
          - count/spans
        traces/common:
          exporters:
          - otlp/standard
          - otlp/force-probability-pipeline
          - otlp/force-nginx-pipeline
          processors:
          - memory_limiter
          - tail_sampling
          - filter/spans
          - resourcedetection
          - k8sattributes
          - resource
          - attributes/traces-standard
          - attributes/traces-probalistic-by-servce
          - attributes/traces-probalistic-nginx
          - groupbyattrs/traces
          - routing/traces
          receivers:
          - otlp
          - jaeger
          - zipkin
        traces/force-nginx:
          exporters:
          - spanmetrics
          - count/spans
          - kafka/traces
          processors:
          - attributes/add-nginx-tag
          receivers:
          - otlp/force-nginx-pipeline
        traces/force-probability:
          exporters:
          - spanmetrics
          - count/spans
          - kafka/traces
          processors:
          - attributes/add-passed-tag
          - probabilistic_sampler
          receivers:
          - otlp/force-probability-pipeline
        traces/standard:
          exporters:
          - spanmetrics
          - count/spans
          - kafka/traces
          receivers:
          - otlp/standard
      telemetry:
        logs:
          level: warn
wildum commented 6 months ago

I reproduced the error locally with a similar setup as you have: a sample app generating traces -> otel collector v0.96.0 -> kafka -> alloy -> grafana cloud This is what I see in Alloy:

level=debug msg="Kafka message claimed" component_path=/ component_id=otelcol.receiver.kafka.otelkafka value="\n\x9b\r\n$\n\"\n\fservice.name\x12\x12\n\x10otel-example-app\x12\xf2\f\n\x10\n\x0eexample-tracer\x12B\n\x10Bm\x11\xa5\xb9(l\xa8\x8e9\x10|\x05\xab\xca\xd4\x12\b\xc0\xe5\x7fb\xe9h\x1a\xa3\"\x00*\fsample-trace0\x019z+:\x12υ\xc9\x17A\x1e\xda:\x12υ\xc9\x17z\x00\x12B\n\x10\xae\xfe\x1b+\x12}\x00PE\xb5\xe5)\x96\xb8\xf7\x18\x12\b\x9fO#\x98T\xe9\x80 \"\x00*\fsample-trace0\x019s\xd3-\x1eυ\xc9\x17A\xa2\xc6/\x1eυ\xc9\x17z\x00\x12B\n\x10\x89ja\xb4\xd2\xd0\xf7\xfc-e\x87\"\xa1i\x7f\x1e\x12\b>\x83\xae>\xfdI\x86Y\"\x00*\fsample-trace0\x019\x1fh$*υ\xc9\x17A\xe3\xef%*υ\xc9\x17z\x00\x12B\n\x10g\xec\a\xc8g\x00u\xfe(\xe1ZY\xbcr\xd0\xdd\x12\b#\x90\x93\u0590\x88,\xf1\"\x00*\fsample-trace0\x019\xb8\x19\x196υ\xc9\x17Af\xde\x1c6υ\xc9\x17z\x00\x12B\n\x10E$K\xac\x95|ܡ&\v\xf2\x7f\xb0=a\x03\x12\b\x1c\x04\"\xaer\x06w\xeb\"\x00*\fsample-trace0\x019\xba\\'Bυ\xc9\x17A\xd3\xc0'Bυ\xc9\x17z\x00\x12B\n\x10\xbf7\x1al\x11\xae\x02\x94\xed\x94,\xd0w\x14\xef\x88\x12\b\x00Ԅ\x01\xe3\xe6\xe6#\"\x00*\fsample-trace0\x019@\b!Nυ\xc9\x17A`P\"Nυ\xc9\x17z\x00\x12B\n\x10\x06\xd7cԬ\xd7\xe4\xcdy\xbd\xc4J\x10\xa6\xfd,\x12\b\x1aH\xb5\xb8\n\bT\xd0\"\x00*\fsample-trace0\x019\xc5\xd9,Zυ\xc9\x17AV\xae-Zυ\xc9\x17z\x00\x12B\n\x10\xb6\x9c\xd2\xfd\\X\x19\x03^\xbe\x80c\xc3?v\x05\x12\b\f=\x80ޕPy\x98\"\x00*\fsample-trace0\x019\xe3\x01\x1cfυ\xc9\x17A\xd2\xdb\x1cfυ\xc9\x17z\x00\x12B\n\x10\x15\xeerI\x15\xef\x0f\xba\xcc\x16Iy\xf62\x00\xaf\x12\b\xe2V\xa8x\xd0R̔\"\x00*\fsample-trace0\x019\x035\x17rυ\xc9\x17A\xb1\x04\x18rυ\xc9\x17z\x00\x12B\n\x10]2q\f\xb6\xa7\xc6v\x99L\x1d\x00\xac\xd9t\xcd\x12\b\xd2KD<\xe34:\x1b\"\x00*\fsample-trace0\x019\nB\n~υ\xc9\x17A<\n\v~υ\xc9\x17z\x00\x12B\n\x10\xbf\x87\xe9M\xef1\xea\xf0wE\xf3v\x90\x92q\xb7\x12\bQLC:i;K\xe7\"\x00*\fsample-trace0\x019\xf9%\x01\x8aυ\xc9\x17A)\x18\x02\x8aυ\xc9\x17z\x00\x12B\n\x10|\xd7I2\x81Xޫ\xd6]@\xcfcAh\xaf\x12\b\xad\xce.xG\xa4_Z\"\x00*\fsample-trace0\x019z\xdb\xf5\x95υ\xc9\x17A-W\xf6\x95υ\xc9\x17z\x00\x12B\n\x10\x11\xecs^\x15\xbc\xb5\xfe\xaf\xde'\x11O\a\xe7\x01\x12\b\x92\xaf\x1d\xea\xd2[\x9c\xd9\"\x00*\fsample-trace0\x019\xd5~\xf5\xa1υ\xc9\x17A\x9fn\xf8\xa1υ\xc9\x17z\x00\x12B\n\x10\x129\x87\x1e\xe2\x1d\xc9C}\x01\xc0e\x18\x1e\x98\x93\x12\b\x8a\xe8\x1c\x1d\rz\x83-\"\x00*\fsample-trace0\x019{\xfb\x04\xaeυ\xc9\x17A\xa2-\x06\xaeυ\xc9\x17z\x00\x12B\n\x10n\x83\xcc_/rI\x02jT:\x93\xea\x87\x1f\xfc\x12\b*Jj\xe89(\x0f\x82\"\x00*\fsample-trace0\x019\n\x9f\xf7\xb9υ\xc9\x17A\x1ee\xf8\xb9υ\xc9\x17z\x00\x12B\n\x100\xae\x01P\xf0\x11\f\x9e\u05fc\xe5\xff\x17UV\f\x12\b\xc8\xcd֢\xdf}\xf1h\"\x00*\fsample-trace0\x019\xed\x84\xee\xc5υ\xc9\x17AC\xd1\xee\xc5υ\xc9\x17z\x00\x12B\n\x10\xe2\x90W\xd1\t}Cz \xbciʁ\x1a\xf5\xab\x12\b-@\xa8\xe1\xc3\x04\r\xb8\"\x00*\fsample-trace0\x019\x04\n\xe7\xd1υ\xc9\x17Ai\xfa\xe7\xd1υ\xc9\x17z\x00\x12B\n\x10\xc1{\xfbe\xd7%\xa2\xc0 ey\x95`\xee]\xcf\x12\b\xec\x81l\xd0\xf7\xec\xd7\xe9\"\x00*\fsample-trace0\x019\xba;\xdd\xddυ\xc9\x17A8\x1a\xde\xddυ\xc9\x17z\x00\x12B\n\x10\x15\x12\xb4\a4O\x83:;˂\xb1\a\x85\xdcc\x12\b\xc5\xd6\xf8\xc6\xc5F\x03`\"\x00*\fsample-trace0\x019T\xa9\xf0\xe9υ\xc9\x17Al\"\xf1\xe9υ\xc9\x17z\x00\x12B\n\x10\xfe\xcd\xc4\xf3\xda\x01m\x92\xa1\xa01.\xaf\x0e\xcd\x04\x12\b\x0e'\xfdnM)\x81\x8f\"\x00*\fsample-trace0\x019\xb2\x9c\xed\xf5υ\xc9\x17A\xbbB\xef\xf5υ\xc9\x17z\x00\x12B\n\x10a\x9bJL\xb4skg*\xd5\xdd\xcd\x19\xfdf\x8d\x12\b\x89R\xca\x01@\x01\xdfl\"\x00*\fsample-trace0\x019G\xaf\xe1\x01Ѕ\xc9\x17A\xae|\xe2\x01Ѕ\xc9\x17z\x00\x12B\n\x10\xd8BW\xacI߄\x1b\xf0\xc1\xc9\x14\x16\x1c<\x94\x12\b\xca\aU\xb3\xb2\xc0\xa4q\"\x00*\fsample-trace0\x019\xf3%\xdd\rЅ\xc9\x17A\xa7\x8c\xdd\rЅ\xc9\x17z\x00\x12B\n\x10\x0e\nZ^\xd5%\xdc0\x9ex\xf6\x8aΖ?x\x12\bS\xf8i\x8d\xfdKU\xf7\"\x00*\fsample-trace0\x019'h\xce\x19Ѕ\xc9\x17A\xcf\xc2\xce\x19Ѕ\xc9\x17z\x00\x12B\n\x10\x0e\xcc\x113\xb9S\xda\xe2/S\xf9\x10*\xe0R\x81\x12\b\xac%\xf1\xaf۴\xc7\xce\"\x00*\fsample-trace0\x019\b\xd2\xcc%Ѕ\xc9\x17A33\xcd%Ѕ\xc9\x17z\x00" timestamp=2024-04-25T14:31:02.409+02:00 topic=otlp-tracing
ts=2024-04-25T12:31:02.415735Z level=error msg="failed to unmarshal message" component_path=/ component_id=otelcol.receiver.kafka.otelkafka error="proto: wrong wireType = 2 for field TimeUnixNano"
wildum commented 6 months ago

I will try to replace alloy by another otel collector so see whether Alloy is the problem or if the same error pops up with otel collector

wildum commented 6 months ago

It works fine with: sample app -> Otel Collector 1 -> kafka -> Otel Collector 2 -> grafana cloud I don't see any error and the traces are correctly stored in Tempo. Seems like the error is in Alloy and I don't that updating to the new version will solve it because I'm using the same Otel collector version as we currently use in Alloy

wildum commented 6 months ago

Trying with Alloy again, I noticed that the first traces are correctly sent (no error + they appear in tempo). The error only pops up after a few seconds

wildum commented 6 months ago

Found out that at first the traces are consumed correctly by the tracesConsumerGroupHandler but after a few seconds they are being consumed by the metricsConsumerGroupHandler or the logsConsumerGroupHandler and that's when the error appear (because the schemas are different).

wildum commented 6 months ago

The tracesConsumerGroup starts and correctly consumes the traces but then it gets cancelled. If we are lucky it restarts and cancels several times before stabilizing:

ts=2024-04-25T14:54:10.497616Z level=info msg=CANCEL component_path=/ component_id=otelcol.receiver.kafka.otelkafka
ts=2024-04-25T14:54:11.039634Z level=info msg="Starting trace consumer group" component_path=/ component_id=otelcol.receiver.kafka.otelkafka partition=0
ts=2024-04-25T14:54:14.181763Z level=info msg=CANCEL component_path=/ component_id=otelcol.receiver.kafka.otelkafka
ts=2024-04-25T14:54:14.505917Z level=info msg="Starting trace consumer group" component_path=/ component_id=otelcol.receiver.kafka.otelkafka partition=0

In this case it seems to work properly forever. But if we are unlucky another group takes over:

ts=2024-04-25T14:58:43.973905Z level=info msg=CANCEL component_path=/ component_id=otelcol.receiver.kafka.otelkafka
ts=2024-04-25T14:58:44.501512Z level=info msg="Starting metrics consumer group" component_path=/ component_id=otelcol.receiver.kafka.otelkafka partition=0

When this happens, it does not recover. The metrics or logs group will keep trying to unmarshall the traces

wildum commented 6 months ago

@elburnetto-intapp I found a workaround if renaming your topic is acceptable:

The consumerGroups will claim a default topic: otlp_spans for traces, otlp_metrics for metrics and otlp_logs for logs. When you set the topic to a value, it sets it to all groups. In your case the three groups are interested by otlp-tracing. Somehow the trace consumer claims it first but after a few cancels another group claims it and you get the error. If you don't set topic = "" in Alloy config, Alloy will set it to otlp_spans for the three groups. By setting it to "", every group get their own default value which means that the traceConsumerGroup will be the only one claiming this topic.

I will continue investigating to understand why this was made this way in otel to find a proper solution

wildum commented 6 months ago

It works with Otel collector because you specify the type of telemetry when you define the pipeline:

service:
  pipelines:
    traces:
      receivers:
        - kafka
      exporters:
        - otlp

In this case the kafka receiver will only start the traces consumer group

elburnetto-intapp commented 6 months ago

Hey @wildum,

Really appreciate you looking into this and giving some detailed analysis.

We've created the new Kafka topic 'otlp_spans' and got this running in one of our Dev environments, and can confirm this is working with no errors from Alloy at all (and all traces are being received by Tempo).

ptodev commented 6 months ago

This reminds me of #251. I think we need to make the otelcol.receiver components smart enough to only start a metric/traces/logs receivers if they are actually configured for the given component. E.g. if the otelcol.receiver component isn't configured to output metrics downstream, it shouldn't start a metrics receiver.

wildum commented 6 months ago

Sounds good, I can draft something for the receivers

elburnetto-intapp commented 6 months ago

Hey @wildum,

Hope you had a great weekend?

We've found that when we don't set the Kafka Topic in Alloy, it's auto-creating the topics with our Brokers.

E.g. otlp_spans (we created this, as we have a Kafka Operator running, and define all Clusters/Topics as code), otlp_metrics and otlp_logs however have auto-created themselves, which means we can't manage them via config.

Not sure if this is expected behaviour or not.

Cheers, Alex

wildum commented 6 months ago

Hey, thanks for letting me know, and sorry about that. This is a bug, the receiver component should not create topics that are not needed. I'm currently working on a fix for the next release (currently planned for the 07th of May). I will make sure that this behavior is fixed and will update this ticket once the fix is for sure in the next release.

wildum commented 6 months ago

Hello @elburnetto-intapp, the fix has been merged to main and will be part of the next release (Alloy v1.1.0 the 7th of May). With this fix the default value of topic is set to "" so you won't have to explicitly set it to "" in your config when you update Alloy.

You will still need to update your config: In the output block you need to set only the signals that you need.

If you only have one topic "otlp_spans" for traces then you can just do:

otelcol.receiver.kafka "otelkafka" {
 // rest of your config
  output {
    traces  = [otelcol.processor.batch.otelkafka.input]
  }
}

or if your topic name is different from "otlp_spans":

otelcol.receiver.kafka "otelkafka" {
 // rest of your config
  topic = "traces-topic"
  output {
    traces  = [otelcol.processor.batch.otelkafka.input]
  }
}

If you have several topics for different telemetry signals then you must proceed as follows:

Either you don't specify the topic and you can use the receiver for the three telemetry signals but the topics have to be "otlp_spans", "otlp_metrics", "otlp_logs":

otelcol.receiver.kafka "otelkafka" {
 // rest of your config
  output {
    metrics = [otelcol.processor.batch.otelkafka.input]
    logs    = [otelcol.processor.batch.otelkafka.input]
    traces  = [otelcol.processor.batch.otelkafka.input]
  }
}

Or you specify a topic but you will need to create different receivers (if you need the three signals):

otelcol.receiver.kafka "otelkafka_traces" {
  // rest of your config
  topic = "traces-topic"
  output {
    traces  = [otelcol.processor.batch.otelkafka.input]
  }
}

otelcol.receiver.kafka "otelkafka_metrics" {
  // rest of your config
  topic = "metrics-topic"
  output {
    metrics  = [otelcol.processor.batch.otelkafka.input]
  }
}

otelcol.receiver.kafka "otelkafka_logs" {
  // rest of your config
  topic = "logs-topic"
  output {
    logs  = [otelcol.processor.batch.otelkafka.input]
  }
}

LMK if it's not clear or if you encounter problems with the new release when it will be out. I don't like how the config is done but we must stay consistent with the Otel collector. I opened a ticket to change it: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32735

elburnetto-intapp commented 6 months ago

Hey @wildum,

Amazing, thanks for that, makes clear sense!

Cheers, Alex

wildum commented 6 months ago

@elburnetto-intapp quick update, the release of Alloy 1.1 is slightly delayed. It was previously planned for today but we will need a few more days. Sorry