vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
18.28k stars 1.61k forks source link

aws_cloudwatch_logs sink is leaking #17275

Open AndrewChubatiuk opened 1 year ago

AndrewChubatiuk commented 1 year ago

A note for the community

No response

Problem

Using vector helm chart, app version 0.29.0 on AWS EKS And on a component memory allocation graph you can see that only cloudwatch logs component memory allocation is constantly growing

Screenshot 2023-05-02 at 10 10 58

Configuration

data_dir: /vector-data-dir
expire_metrics_secs: 300
api:
  enabled: false
  address: 0.0.0.0:8686
  playground: true
log_schema:
  host_key: host
  message_key: message
  source_type_key: source_type
  timestamp_key: timestamp
sources:
  kubernetes_logs:
    type: kubernetes_logs
    glob_minimum_cooldown_ms: 5000
  internal_metrics:
    type: internal_metrics
    scrape_interval_secs: 30
transforms:
  kubernetes_logs_remap_json:
    inputs:
      - kubernetes_logs
    type: remap
    source: |-
      # Prepare context object
      context = .

      if exists(.kubernetes.pod_labels."project.log-group") {
        context.log_group = .kubernetes.pod_labels."project.log-group"
      } else if exists(.kubernetes.pod_labels."eventsource-name") {
        context.log_group, _ = "eventsource-" + .kubernetes.pod_labels."eventsource-name"
      } else if exists(.kubernetes.pod_labels."eventbus-name") {
        context.log_group, _ = "eventbus-" + .kubernetes.pod_labels."eventbus-name"
      } else if exists(.kubernetes.pod_labels."sensor-name") {
        context.log_group, _ = "sensor-" + .kubernetes.pod_labels."sensor-name"
      } else {
        context.log_group = .kubernetes.container_name
      }

      del(context.message)

      # Parse JSON if valid
      structured, err = parse_json(.message)
      log(.message, level: "debug")
      if err != null {
        log(err, level: "debug")
        log(.message, level: "debug")
        structured.message = .message
      }
      structured.context = context

      # Add mandatory fields
      if !exists(structured.service) {
        log("Empty service, fallback to pod name", level: "debug")
        structured.service = .kubernetes.pod_name
      }
      if !exists(structured.buildVersion) {
        log("Empty buildVersion, fallback to container image", level: "debug")
        structured.buildVersion = .kubernetes.container_image
      }
      if !exists(structured.host) {
        log("Empty host, fallback to pod node name", level: "debug")
        structured.host = .kubernetes.pod_node_name
      }
      if !exists(structured.level) {
        log("Empty level, set to INFO", level: "debug")
        structured.level = "INFO"
      }
      if !exists(structured.timestamp) {
        log("Empty timestamp, set to now", level: "debug")
        structured.timestamp = now()
      }

      . = structured
  log_group_metric:
    inputs:
      - kubernetes_logs_remap_json
    type: log_to_metric
    metrics:
      - type: set
        field: context.log_group
        name: vector_log_group_info
        tags:
          log_group: 'project/{{ $.Values.global.cluster }}/{{ `{{ context.kubernetes.pod_namespace }}/{{ context.log_group }}` }}'
          pod_name: '{{ printf "{{ context.kubernetes.pod_name }}" }}'
          pod_namespace: '{{ printf "{{ context.kubernetes.pod_namespace }}" }}'
sinks:
  cloudwatch:
    buffer:
      type: disk
      max_size: 1000000000
    type: aws_cloudwatch_logs
    inputs:
      - kubernetes_logs_remap_json
    create_missing_group: true
    create_missing_stream: true
    group_name: 'project/{{ $.Values.global.cluster }}/{{ `{{ context.kubernetes.pod_namespace }}/{{ context.log_group }}` }}'
    compression: none
    region: '{{ $.Values.global.region }}'
    stream_name: '{{ printf "{{ context.kubernetes.pod_name }}" }}'
    encoding:
      codec: json
      timestamp_format: rfc3339
    healthcheck:
      enabled: false
  prometheus_sink:
    address: 0.0.0.0:9090
    inputs:
      - internal_metrics
      - log_group_metric
    type: prometheus_exporter

Version

0.29.0

Debug Output

No response

Example Data

No response

Additional Context

No response

References

No response

adolsalamanca commented 1 year ago

We're also experiencing constant increases in memory consumption in production workloads using AWS cloudwatch sink.

Any news on this one folks? Thanks

Cc @jszwedko

jszwedko commented 1 year ago

Unfortunately not yet. If someone else was interested in pushing this forward, I think the next step would be to run Vector under a memory profiler like valgrind to see if a leak can be identified.

adolsalamanca commented 1 year ago

Ok, in our case seems like adding the expire_metrics_secs global option has helped to mitigate the increase in memory consumed. Thanks for the update though!

jszwedko commented 11 months ago

@AndrewChubatiuk a couple of questions:

AndrewChubatiuk commented 11 months ago
jszwedko commented 11 months ago
  • increase reported by metric
  • yes

Can you verify that you are actually seeing RSS usage increase? That metric is experimental so I wouldn't be surprised if it was inaccurate.

dsmith3197 commented 11 months ago

One suspicion is that this memory growth in the sink can be attributed to the fact that the sink creates one client per (group, stream) pair (code ref) and that client remains in memory for the remaining lifetime of the sink.

In this case, the stream_name is the kubernetes pod name, which is an unbounded set over time, meaning that the number of clients will continue to increase over time.

dsmith3197 commented 11 months ago

https://github.com/vectordotdev/vector/issues/19345 may also be relevant here.