open-telemetry / opentelemetry-collector-contrib

Contrib repository for the OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
3.06k stars 2.37k forks source link

file_storage don't compact after some pv is full #26256

Closed Tiremisu closed 1 month ago

Tiremisu commented 1 year ago

Component(s)

exporter/sumologic, extension/storage/filestorage

What happened?

Description

Note more and more pv of otel collector exporter/sumo/file_storage getting full(10GB) and will not compact. Over time there are less and less pods can provide service because the sending_queue is full. Even recreate the pod will not trigger compact.

image

Steps to Reproduce

Guessing a lot of logs/entities are flushed to the same collectors whose exporter is using file_storage and last for 2 hours.

Architecture A daemonset of otel collector responsible for collect docker logs -> a statefulset of otel collector to buffer before sending to -> storage backend.

Expected Result

  1. The file_storage should compact on_rebound.
  2. Could you tell more detail about how the compact works and when to trigger the file_storage to compact. I can't understand very well. In my undertanding, a. the receiver keep receiving the data -> file_storage -> mmap()? -> sending_queue(memory) b. both rebound_needed_threshold_mib and rebound_trigger_threshold_mib track the mmap()? c. Is it possile that file_storage is stuck in sending_queue?
image
  1. I wonder why the workload from exporter send to another receiver (statefulset of otel) is not balanced. The daemonset collectors exporter logs to statefulset collectors via the (k8s) service.

Workflow: Daesmonset pod exporter -> (k8s) service -> receiver of Statefuleset pod

Actual Result

Only some of pod will auto-compact the file_storage, just as the chart shows. Those with pv full can't not compact, looks like stop working, even after recreating pod.

Collector version

0.74.0

Environment information

Environment

Kubernetes 1.23.9

OpenTelemetry Collector configuration

daemenset otel configurations:

exporters:
  otlphttp:
    endpoint: http://${LOGS_METADATA_SVC}.${NAMESPACE}.svc.cluster.local.:4318 # sumologic-sumologic-metadata-logs
    sending_queue:
      queue_size: 10
extensions:
  file_storage:
    compaction:
      directory: /var/lib/storage/otc
      on_rebound: true
    directory: /var/lib/storage/otc
    timeout: 10s
  health_check: {}
  pprof: {}
processors:
  batch:
    send_batch_max_size: 2000. ## The upper limit of the batch size
    send_batch_size: 1000      ## trigger
    timeout: 1s.               ## tigger
receivers:
  filelog/containers:
    fingerprint_size: 17408
    include:
    - /var/log/pods/*/*/*.log
    include_file_name: false
    include_file_path: true
    operators:
    - id: get-format
      routes:
      - expr: body matches "^\\{"
        output: parser-docker
      - expr: body matches "^[^ Z]+ "
        output: parser-crio
      - expr: body matches "^[^ Z]+Z"
        output: parser-containerd
      type: router
    - id: parser-crio
      output: merge-cri-lines
      parse_to: body
      regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*)( |)(?P<log>.*)$
      timestamp:
        layout: "2006-01-02T15:04:05.000000000-07:00"
        layout_type: gotime
        parse_from: body.time
      type: regex_parser
    - id: parser-containerd
      output: merge-cri-lines
      parse_to: body
      regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*)( |)(?P<log>.*)$
      timestamp:
        layout: '%Y-%m-%dT%H:%M:%S.%LZ'
        parse_from: body.time
      type: regex_parser
    - id: parser-docker
      output: merge-docker-lines
      parse_to: body
      timestamp:
        layout: '%Y-%m-%dT%H:%M:%S.%LZ'
        parse_from: body.time
      type: json_parser
    - combine_field: body.log
      combine_with: ""
      id: merge-docker-lines
      is_last_entry: body.log matches "\n$"
      output: strip-trailing-newline
      source_identifier: attributes["log.file.path"]
      type: recombine
    - combine_field: body.log
      combine_with: ""
      id: merge-cri-lines
      is_last_entry: body.logtag == "F"
      output: merge-multiline-logs
      overwrite_with: newest
      source_identifier: attributes["log.file.path"]
      type: recombine
    - id: strip-trailing-newline     ## strip trailing newline
      output: merge-multiline-logs
      parse_from: body.log
      parse_to: body
      regex: |-
        ^(?P<log>.*)
        $
      type: regex_parser
    - combine_field: body.log
      combine_with: |2+

      id: merge-multiline-logs 
      is_first_entry: body.log matches "^\\[?\\d{4}-\\d{1,2}-\\d{1,2}.\\d{2}:\\d{2}:\\d{2}"
      output: extract-metadata-from-filepath
      source_identifier: attributes["log.file.path"]
      type: recombine
    - id: extract-metadata-from-filepath
      parse_from: attributes["log.file.path"]
      regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<run_id>\d+)\.log$
      type: regex_parser
    - from: body.stream
      id: move-attributes
      to: attributes["stream"]
      type: move
    - from: attributes.container_name
      to: attributes["k8s.container.name"]
      type: move
    - from: attributes.namespace
      to: attributes["k8s.namespace.name"]
      type: move
    - from: attributes.pod_name
      to: attributes["k8s.pod.name"]
      type: move
    - field: attributes.run_id
      type: remove
    - field: attributes.uid
      type: remove
    - field: attributes["log.file.path"]
      type: remove
    - from: body.log
      to: body
      type: move
    storage: file_storage
service:
  extensions:
  - health_check
  - file_storage
  - pprof
  pipelines:
    logs/containers:
      exporters:
      - otlphttp
      processors:
      - batch
      receivers:
      - filelog/containers
  telemetry:
    logs:
      level: info

statefulset otel configurations

exporters:
  sumologic/containers:
    endpoint: ${SUMO_ENDPOINT_DEFAULT_LOGS_SOURCE} # backend storage
    json_logs:
      add_timestamp: false
    log_format: json
    sending_queue:
      enabled: true
      num_consumers: 10
      queue_size: 10000  # The maximum number of batches stored to disk; Maximum number of batches kept in memory before dropping
      storage: file_storage # alpha; 
    timeout: 30s # Time to wait per individual attempt to send data to a backend
extensions:
  file_storage:
    compaction:
      directory: /tmp
      on_rebound: true
    directory: /var/lib/storage/otc
    timeout: 10s
  health_check: {}
  pprof: {}
processors:
  attributes/fluent_containers:
    actions:
    - action: extract
      key: fluent.tag
      pattern: ^containers\.var\.log\.containers\.(?P<k8s_pod_name>[^_]+)_(?P<k8s_namespace>[^_]+)_(?P<k8s_container_name>.+)-(?P<container_id>[a-f0-9]{64})\.log$
    - action: insert
      from_attribute: container_id
      key: k8s.container.id
    - action: delete
      key: container_id
    - action: insert
      from_attribute: k8s_pod_name
      key: k8s.pod.name
    - action: delete
      key: k8s_pod_name
    - action: insert
      from_attribute: k8s_namespace
      key: k8s.namespace.name
    - action: delete
      key: k8s_namespace
    - action: insert
      from_attribute: k8s_container_name
      key: k8s.container.name
    - action: delete
      key: k8s_container_name
  attributes/remove_fluent_tag:
    actions:
    - action: delete
      key: fluent.tag
  batch:
    send_batch_max_size: 2048  # 
    send_batch_size: 1024  # acts as a trigger and does not affect the size of the batch
    timeout: 1s  # Time duration after which a batch will be sent regardless of size
  filter/exclude-nginx-logs-based-on-resource-attribute-regex:
    logs:
      exclude:
        match_type: regexp
        resource_attributes:
        - key: deployment
          value: sumologic-sumologic-remote-write-proxy
  filter/include_containers:
    logs:
      include:
        match_type: regexp
        record_attributes:
        - key: k8s.container.name
          value: .+
  filter/include_fluent_tag_containers:
    logs:
      include:
        match_type: regexp
        record_attributes:
        - key: fluent.tag
          value: containers\..+
  groupbyattrs/containers:
    keys:
    - k8s.container.id
    - k8s.container.name
    - k8s.namespace.name
    - k8s.pod.name
    - _collector
  k8s_tagger:
    extract:
      annotations:
      - key: '*'
        tag_name: pod_annotations_%s
      delimiter: _
      labels:
      - key: '*'
        tag_name: pod_labels_%s
      metadata:
      - daemonSetName
      - deploymentName
      - hostName
      - namespace
      - nodeName
      - podName
      - serviceName
      - statefulSetName
      namespace_labels:
      - key: '*'
        tag_name: namespace_labels_%s
    owner_lookup_enabled: true
    passthrough: false
    pod_association:
    - from: build_hostname
  logstransform/containers_parse_json:
    operators:
    - if: body matches "^{[\\s\\S]+"
      parse_from: body
      parse_to: body
      type: json_parser
  memory_limiter:
    check_interval: 5s # recommand is 1s
    limit_percentage: 75 # hard limit, GC. Maximum amount of total memory targeted to be allocated by the process heap
    spike_limit_percentage: 20  # ballastextension?
  resource/add_cluster:
    attributes:
    - action: upsert
      key: cluster
      value: prod-applications
  resource/containers_copy_node_to_host:
    attributes:
    - action: upsert
      from_attribute: k8s.node.name
      key: k8s.pod.hostname
  resource/drop_annotations:
    attributes:
    - action: delete
      pattern: ^pod_annotations_.*
  resource/remove_pod_name:
    attributes:
    - action: delete
      key: pod_name
  resource/set_empty_source_metadata:
    attributes:
    - action: insert
      key: _sourceCategory
      value: ""
    - action: insert
      key: _sourceHost
      value: ""
    - action: insert
      key: _sourceName
      value: ""
  source/containers:
    annotation_prefix: pod_annotations_
    collector: prod-applications
    container_annotations:
      enabled: false
      prefixes: []
    exclude:
      container: ""
      namespace: kube-public
      node: sumologic-sumologic-otelagent.*
      pod: istiod-.*
    pod_key: pod
    pod_name_key: pod_name
    pod_template_hash_key: pod_labels_pod-template-hash
    source_category: '%{namespace}/%{pod_name}'
    source_category_prefix: phx/prod/applications/
    source_category_replace_dash: '-'
    source_host: ""
    source_name: '%{namespace}.%{pod}.%{container}'
  sumologic_schema:
    add_cloud_namespace: false
  transform/add_timestamp:
    log_statements:
    - context: log
      statements:
      - set(attributes["timestamp"], Int(time_unix_nano / 1000000))
  transform/flatten:
    log_statements:
    - context: log
      statements:
      - merge_maps(attributes, body, "insert")
      - set(body, "") where IsMatch(body, "^{") == true
  transform/remove_attributes:
    log_statements:
    - context: log
      statements:
      - limit(attributes, 0, [])
receivers:
  fluentforward:
    endpoint: 0.0.0.0:24321
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:4318
service:
  extensions:
  - health_check
  - file_storage
  - pprof
  pipelines:
    logs/fluent/containers:
      exporters:
      - sumologic/containers
      processors:
      - memory_limiter
      - filter/include_fluent_tag_containers
      - attributes/fluent_containers
      - groupbyattrs/containers
      - k8s_tagger
      - resource/add_cluster
      - resource/set_empty_source_metadata
      - resource/containers_copy_node_to_host
      - sumologic_schema
      - source/containers
      - filter/exclude-nginx-logs-based-on-resource-attribute-regex
      - resource/remove_pod_name
      - resource/drop_annotations
      - attributes/remove_fluent_tag
      - transform/add_timestamp
      - batch
      receivers:
      - fluentforward
    logs/otlp/containers:
      exporters:
      - sumologic/containers
      processors:
      - memory_limiter
      - filter/include_containers
      - groupbyattrs/containers
      - k8s_tagger
      - resource/add_cluster
      - resource/set_empty_source_metadata
      - resource/containers_copy_node_to_host
      - sumologic_schema
      - source/containers
      - logstransform/containers_parse_json
      - filter/exclude-nginx-logs-based-on-resource-attribute-regex
      - resource/remove_pod_name
      - resource/drop_annotations
      - transform/add_timestamp
      - batch
      receivers:
      - otlp
  telemetry:
    logs:
      level: info

### Log output

```shell
"2023-08-28T20:52:17.501Z   error   exporterhelper/queued_retry.go:317  Dropping data because sending_queue is full. Try increasing queue_size. {\"kind\": \"exporter\", \"data_type\": \"logs\", \"name\": \"sumologic/containers\", \"dropped_items\": 1117}

"2023-08-28T20:52:17.501Z   warn    batchprocessor@v0.74.0/batch_processor.go:177   Sender failed   {\"kind\": \"processor\", \"name\": \"batch\", \"pipeline\": \"logs/otlp/containers\", \"error\": \"sending_queue is full\"}",

Additional context

No response

github-actions[bot] commented 1 year ago

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

Tiremisu commented 1 year ago

Can we add more compact conditons like timeout? Will compact delete whole file or only compact some of the file?

djaglowski commented 1 year ago

Could you tell more detail about how the compact works and when to trigger the file_storage to compact.

I'm not sure I can explain it more accurately than the documentation but here's the simplified mental model that I use to reason about this.

We basically need to look at the file as a reservoir for data. If we mostly fill up the reservoir, then it will automatically expand. However, if we drain the reservoir, it will not automatically shrink. We can use compaction to shrink the reservoir, but it must remain at least as large as its contents. So typically we need to wait until contents of the reservoir have been substantially drained. Then we can actually shrink it by a meaningful amount.

This is why we must meet two separate conditions before we run compaction.

  1. The reservoir must expand to an undesirably large size. (Defined by rebound_needed_threshold_mib)
  2. Then, the contents must be drained enough so that we can meaningfully shrink the reservoir. (Defined by rebound_trigger_threshold_mib)

Your graph showing used bytes may only be showing the size of the reservoir. Typically when compaction does not occur, it is because the contents are not draining as quickly as you expect.

Can we add more compact conditons like timeout?

The problem is that a timeout does not take into account whether or not the contents of the reservoir have been drained. There may be some value here because you could in theory reclaim some space, but it may often be a lot less than you'd expect, and you'll have spent a non-trivial amount of compute for it.

Will compact delete whole file or only compact some of the file?

Compaction will create a complete copy of the file, edit the copy, and then finally overwrite the old file with the new.

Tiremisu commented 1 year ago

@djaglowski Thanks for your explanation. For the matter if reservoir drain or not. The log don't or no evidence can tell us whether it reach the compact condition, we are so confusing about that and can't find the pattern. But I learn to infer if data drain or not by calculating the metric (otelcol_receiver_accepted_log_records-otelcol_exporter_sent_log_records). So we found that even all data drain, the file don't compact. My situation using the filestorage is we use it as the exporter persistent queue. So my concern is if filestorage full will that:

1 impact the sending queue to persistent queue. As the behavior show, some of these still can accept/send data. but

2 even it can accept/send data, how about its performance? It can't use the file(10GB) to buffer anymore, right? The data only can buffer in memory(Max 2GB*75%).

3 Could we add more condition to the compact condition eg. sending_queue size? The existing one is too underneath for us and not clear. sending_queue drop to 0 means data already been send, or the metric(accepted-send) I mention.

github-actions[bot] commented 11 months ago

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

github-actions[bot] commented 7 months ago

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

github-actions[bot] commented 5 months ago

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

djaglowski commented 5 months ago

I have not had time to look into this further. If anyone is able to provide a unit test that demonstrates the failure, this would make solving it much easier.

github-actions[bot] commented 3 months ago

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

github-actions[bot] commented 1 month ago

This issue has been closed as inactive because it has been stale for 120 days with no activity.