open-telemetry / opentelemetry-collector-contrib

Contrib repository for the OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
3.07k stars 2.37k forks source link

Kafka exporter producer.max_message_bytes not honoured #22033

Closed idsvandermolen closed 2 weeks ago

idsvandermolen commented 1 year ago

Component(s)

exporter/kafka

What happened?

Description

We had an initial setup of an OTLP receiver receiving spans and writing them to a Kafka topic via Kafka exporter returning many errors like:

2023-05-10T14:12:25.069Z    error   exporterhelper/queued_retry.go:368  Exporting failed. Try enabling retry_on_failure config option to retry on retryable errors  {"kind": "exporter", "data_type": "traces", "name": "kafka/spans", "error": "Failed to deliver 1 messages due to kafka server: Message was too large, server rejected it to avoid allocation error"}

We have a simple configuration with many default settings (see below). Then we reduced the batch size with:

  batch:
    send_batch_size: 1500
    send_batch_max_size: 1500

which greatly reduced the amount of errors we saw, because the internal batch size got smaller. However, we still see them occur now and then. This leads us to believe that the internal batches are not split/processed to accommodate the producer.max_message_bytes setting

Steps to Reproduce

Use default settings for batch processor, kafka exporter and default 1MB max message size in Kafka cluster. Then generate many spans to make sure the internal collector batches exceed 1MB.

Expected Result

We expect the internal batch to be split if it exceeds producer.max_message_bytes to prevent errors and spans being dropped eventually.

Actual Result

producer.max_message_bytes does not seem to have any effect and the Kafka exporter tries to send larger batches to Kafka.

Collector version

0.77.0

Environment information

Environment

OS: GKE v1.23.16-gke.1400

OpenTelemetry Collector configuration

exporters:
  kafka/spans:
    protocol_version: 3.0.0
    brokers:
      - otel-kafka-bootstrap:9093
    topic: otlp_spans
    auth:
      tls:
        ca_file: /etc/otel-kafka-secrets/ca.crt
        cert_file: /etc/otel-agent-kafka-user/user.crt
        key_file: /etc/otel-agent-kafka-user/user.key
    sending_queue:
      queue_size: 1000
    producer:
      max_message_bytes: 1000000
extensions:
  health_check: {}
  memory_ballast:
    size_in_percentage: 40
processors:
  batch:
  memory_limiter:
    check_interval: 1s
    limit_percentage: 80
    spike_limit_percentage: 20
receivers:
  otlp/mtls:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
        tls:
          cert_file: /etc/otel-agent-secrets/tls.crt
          key_file: /etc/otel-agent-secrets/tls.key
          ca_file: /etc/otel-agent-secrets/ca.crt
service:
  extensions:
  - health_check
  - memory_ballast
  pipelines:
    traces:
      exporters:
      - kafka/spans
      processors:
      - memory_limiter
      - batch
      receivers:
      - otlp/mtls

Log output

2023-05-10T14:08:02.060Z    error   exporterhelper/queued_retry.go:368  Exporting failed. Try enabling retry_on_failure config option to retry on retryable errors  {"kind": "exporter", "data_type": "traces", "name": "kafka/spans", "error": "Failed to deliver 1 messages due to kafka server: Message was too large, server rejected it to avoid allocation error"}
go.opentelemetry.io/collector/exporter/exporterhelper.(*retrySender).send
    go.opentelemetry.io/collector/exporter@v0.77.0/exporterhelper/queued_retry.go:368
go.opentelemetry.io/collector/exporter/exporterhelper.(*tracesExporterWithObservability).send
    go.opentelemetry.io/collector/exporter@v0.77.0/exporterhelper/traces.go:137
go.opentelemetry.io/collector/exporter/exporterhelper.(*queuedRetrySender).start.func1
    go.opentelemetry.io/collector/exporter@v0.77.0/exporterhelper/queued_retry.go:206
go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*boundedMemoryQueue).StartConsumers.func1
    go.opentelemetry.io/collector/exporter@v0.77.0/exporterhelper/internal/bounded_memory_queue.go:58

Additional context

No response

github-actions[bot] commented 1 year ago

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

idsvandermolen commented 1 year ago

Also we seem to observe high memory usage in the OTEL collector when these exceptions occur (we had many exceptions at ~08:30):

image
MovieStoreGuy commented 1 year ago

Mmm, that is interest on the memory spike. It would be awesome if you're able to grab some pprof samples to help paint a bigger picture of what happened. I would have a guess that a network blip may have caused internal back pressure.

Interesting to note that the exporter isn't respecting the byte size limit, let me see if I can quickly chase that one up for you.

MovieStoreGuy commented 1 year ago

It looks like the issue is upstream from what I can tell, specifically this line

The issue being is if you have a message size of , 1_000_000 bytes (1MB), and your topic limit is also one 1MB, the resulting check locally passes its it check since 1MB > 1MB returns false, then the broker checks the size and performs 1MB >= 1MB and denies the message.

Let me see if I can raise an issue on the project and link it back here.

MovieStoreGuy commented 1 year ago

The kafka docs aren't really forth coming on what the comparison should be looking at https://kafka.apache.org/documentation/#topicconfigs_max.message.bytes.

If you don't mind doing an experiment on my behalf, could you set:

    producer:
      max_message_bytes: 999999

If the errors disappear this confirms the comparison and I can report it upstream to the library.

idsvandermolen commented 1 year ago

Note that we have set producer.max_message_bytes: 1000000 in the Kafka exporter while in Kafka broker it is 1 MiB, i.e. 1048588 (https://kafka.apache.org/30/documentation.html#brokerconfigs_message.max.bytes), but I'll ask if we can test with 999999

idsvandermolen commented 1 year ago

Hi, we've been running it over the weekend with the max message size set to 999999 and we're still seeing the exact same issue.

otel-agent-746f58cfc4-t8tmg otel-agent 2023-05-22T09:34:45.141Z info    exporterhelper/queued_retry.go:434  Exporting failed. Will retry the request after interval.    {"kind": "exporter", "data_type": "traces", "name": "kafka/spans", "error": "Failed to deliver 1 messages due to kafka server: Message was too large, server rejected it to avoid allocation error", "interval": "15.387055906s"}
otel-agent-746f58cfc4-t8tmg otel-agent 2023-05-22T09:35:00.530Z error   exporterhelper/queued_retry.go:176  Exporting failed. No more retries left. Dropping data.  {"kind": "exporter", "data_type": "traces", "name": "kafka/spans", "error": "max elapsed time expired Failed to deliver 1 messages due to kafka server: Message was too large, server rejected it to avoid allocation error", "dropped_items": 1208}
otel-agent-746f58cfc4-t8tmg otel-agent go.opentelemetry.io/collector/exporter/exporterhelper.(*queuedRetrySender).onTemporaryFailure
otel-agent-746f58cfc4-t8tmg otel-agent  go.opentelemetry.io/collector/exporter@v0.77.0/exporterhelper/queued_retry.go:176
otel-agent-746f58cfc4-t8tmg otel-agent go.opentelemetry.io/collector/exporter/exporterhelper.(*retrySender).send
otel-agent-746f58cfc4-t8tmg otel-agent  go.opentelemetry.io/collector/exporter@v0.77.0/exporterhelper/queued_retry.go:418
otel-agent-746f58cfc4-t8tmg otel-agent go.opentelemetry.io/collector/exporter/exporterhelper.(*tracesExporterWithObservability).send
otel-agent-746f58cfc4-t8tmg otel-agent  go.opentelemetry.io/collector/exporter@v0.77.0/exporterhelper/traces.go:137
otel-agent-746f58cfc4-t8tmg otel-agent go.opentelemetry.io/collector/exporter/exporterhelper.(*queuedRetrySender).start.func1
otel-agent-746f58cfc4-t8tmg otel-agent  go.opentelemetry.io/collector/exporter@v0.77.0/exporterhelper/queued_retry.go:206
otel-agent-746f58cfc4-t8tmg otel-agent go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*boundedMemoryQueue).StartConsumers.func1
otel-agent-746f58cfc4-t8tmg otel-agent  go.opentelemetry.io/collector/exporter@v0.77.0/exporterhelper/internal/bounded_memory_queue.go:58
idsvandermolen commented 1 year ago

Hi, could someone have another look at this issue? Since we noticed that reducing the internal batch size seems to reduce the frequency of this error (but does not eliminate it), we were wondering if the internal batch is split in the Kafka exporter to make sure the message send to Kafka is below the max message size?

github-actions[bot] commented 1 year ago

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

yuuhhe commented 1 year ago

Do not use batch processor, I resolve it by using receiver to kafka exporter directly

idsvandermolen commented 1 year ago

Do not use batch processor, I resolve it by using receiver to kafka exporter directly

We will test that. But it should work fine with the batch processor and not drop batches because they are too large (a batch could be split into smaller batches)

yuuhhe commented 1 year ago

We will test that. But it should work fine with the batch processor and not drop batches because they are too large (a batch could be split into smaller batches)

kafkaexporter's sending_queue setting can be configured

hamedbrd commented 1 year ago

Unfortunately removing batch processor could not help. Although without a batch processor the memory usage increases a lot and on the other side it's recommended to use batch processor in order to control memory usage.

When many of these big messages are coming into the pipeline, then the queue gets full on that pod and then we get these errors:

│ 2023-08-11T08:31:44.512Z    warn    batchprocessor@v0.77.0/batch_processor.go:190    Sender failed    {"kind": "processor", "name": "batch", "pipeline": "traces", "error": "sending_queue is full"}                                       │
│ 2023-08-11T08:31:44.548Z    warn    batchprocessor@v0.77.0/batch_processor.go:190    Sender failed    {"kind": "processor", "name": "batch", "pipeline": "traces", "error": "sending_queue is full"}

And increasing sending_queue could not help much either. When the rate of these big messages are too high, then it's just matter of time that the queue get full on the pod.

We decided to temporary disable retry_on_failure until this issue get resolved. Disabling retry_on_failure at least keeps the pipeline up and running and queue does not get full as it immediately drop the big messages.

I see this PR which looks promising. Looking forward to be released

idsvandermolen commented 1 year ago

@MovieStoreGuy sorry for the ping, but would you mind having another look at the issue and PR #25144 ? We're actually a bit surprised not more people are running into this issue. The kafka exporter module should break up (split / cut) the batch received so it fits into max_message_bytes. If you have peak traffic with higher volumes it would be very likely the batch in OTEL collector handed over to the kafka exporter module would exceed max_message_bytes. The same issue will happen at some point wether you have batch processor enabled or not, the likelihood mainly depends on traffic volume (spikes), configured resources and batching/queueing settings.

Disabling retry_on_failure is just a temporary workaround and also disables retries for temporary issues (like network connection drops).

idsvandermolen commented 1 year ago

@pavolloffay Sorry for the ping, but would you mind having a look?

basch255 commented 1 year ago

I have the same issue:

2023-09-11T12:13:44.130Z info exporterhelper/queued_retry.go:423 Exporting failed. Will retry the request after interval. {"kind": "exporter", "data_type": "traces", "name": "kafka", "error": "Failed to deliver 1 messages due to kafka server: Message was too large, server rejected it to avoid allocation error", "interval": "17.602136766s"}

opentelemetry-collector: v0.79.0 config:

     exporters:
      kafka:
        brokers:
        - kafka.default:9092
        encoding: jaeger_proto
        producer:
          max_message_bytes: 900000
        protocol_version: 2.0.0
        sending_queue:
          queue_size: 1000000
          storage: file_storage
        topic: live

I reduced the max_message_bytes from the default to 900.000 to confirm the theory.

Kafka version: 3.3.2

max.request.size=1048576
message.max.bytes=1000012
basch255 commented 1 year ago

after some time this error occurs too:

2023-09-11T12:51:33.586Z        error   exporterhelper/queued_retry.go:174      Exporting failed. Putting back to the end of the queue. {"kind": "exporter", "data_type": "traces", "name": "kafka", "error": "max elapsed time expired Failed to deliver 1 messages due to kafka server: Message was too large, server rejected it to avoid allocation error"}
go.opentelemetry.io/collector/exporter/exporterhelper.(*queuedRetrySender).onTemporaryFailure
        go.opentelemetry.io/collector/exporter@v0.79.0/exporterhelper/queued_retry.go:174
go.opentelemetry.io/collector/exporter/exporterhelper.(*retrySender).send
        go.opentelemetry.io/collector/exporter@v0.79.0/exporterhelper/queued_retry.go:407
go.opentelemetry.io/collector/exporter/exporterhelper.(*tracesExporterWithObservability).send
        go.opentelemetry.io/collector/exporter@v0.79.0/exporterhelper/traces.go:126
go.opentelemetry.io/collector/exporter/exporterhelper.(*queuedRetrySender).start.func1
        go.opentelemetry.io/collector/exporter@v0.79.0/exporterhelper/queued_retry.go:195
go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*persistentQueue).StartConsumers.func1
        go.opentelemetry.io/collector/exporter@v0.79.0/exporterhelper/internal/persistent_queue.go:55

I think my issue occurs because of a poison message which will be retried forever... That´s why I removed and recreated the underlying pv and pod to check the result. The pod then wrote no exporting failures again while other replicas still did.

Seems that in my case normally it works. Nevertheless something occured once too big messages for Kafka and the collector can´t handle it

basch255 commented 1 year ago

Any updates on this topic? The PR is closed but not merged. Problem still persists.

ricardo-mestre commented 11 months ago

I'm also experiencing this issue. Increasing the number of threads and instances helped me to reduce its occurrence but keeps happening.

rtekdogan commented 11 months ago

We also have the same issue

ricardo-mestre commented 10 months ago

I am assuming this hasn't been merged due to the construction of the exporter helpers. but could you clarify on what's the status of this @MovieStoreGuy ?

ziqingyuan commented 9 months ago

Hi @MovieStoreGuy, sorry for the pin. Just wonder is there any update on this issue? We also have the same problem.

ShahroZafar commented 9 months ago

Hi, we are also facing the same issue

github-actions[bot] commented 7 months ago

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

go-follow commented 6 months ago

We also have the same issue

aarvee11 commented 6 months ago

enabling compression on producer side has helped the situation for us.

Config sample below:

  exporters:
    kafka:
      timeout: "5s"
      protocol_version: 2.0.0 
      topic: otlp_spans
      encoding: otlp_proto
      brokers: 
        - kafka.aarvee.svc:9092
      client_id: "controller_broker_client"
      auth:
        sasl:
          username: "kafkatest"
          password: "supersecret"
          mechanism: "PLAIN"
      metadata:
        full: true
        retry:
          max: 1
          backoff: "250ms"
      retry_on_failure:
        enabled: true
        initial_interval: "1s"
        max_interval: "3s"
        max_elapsed_time: "30s"
      sending_queue:
        enabled: true
        num_consumers: 20
        queue_size: 5000000
      producer:
        max_message_bytes: 109657600
        #flush_max_messages: 5
        compression: gzip
        required_acks: 1

zstd is lot more efficient, but compute heavy and requires kafka protocol v2.1

dominicqi commented 4 months ago

We use snappy compression also have the same issue,

acar-ctpe commented 4 months ago

same issue with snappy

github-actions[bot] commented 2 months ago

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

github-actions[bot] commented 2 weeks ago

This issue has been closed as inactive because it has been stale for 120 days with no activity.