fluent-plugins-nursery / fluent-plugin-cloudwatch-logs

CloudWatch Logs Plugin for Fluentd
MIT License
201 stars 141 forks source link

Duplicate logs in cloudwatch #186

Closed vasu-git closed 4 years ago

vasu-git commented 4 years ago

Problem

I am sending logs from kubernetes cluster to cloudwatch. When I check cloudwatch logs, I see lot of duplicates, and the number of duplicates are different for different log lines. Fluentd is deployed as a daemonset. I see a lot of errors, warnings like below in fluentd logs [error]: Exception emitting record: queue size exceeds limit

2020-05-05 06:52:25 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2020-05-05 06:52:23 +0000 error_class="Aws::CloudWatchLogs::Errors::ThrottlingException" error="Rate exceeded" plugin_id="object:3fccfe4342b0"

2020-05-05 06:52:27 +0000 [warn]: retry succeeded. plugin_id="object:3fccfe4342b0"

It looks like AWS is throttling some of these requests and fluentd tries to send them again, but looks like aws is ending up saving the log events???

I tried setting

      retry_wait 2
      retry_max_times 1

in match config, but to no-avail. Still seeing multiple duplicates (>2) for many log lines

Please correct me if I'm wrong here but... If I want to set no retries at all, it looks like the follow code allows atleast one retry (line 403) if !@put_log_events_disable_retry_limit && @put_log_events_retry_limit < retry_count Probably should have another case to check instead of 0<0??

Steps to replicate

Config:

 <match fluent.**>
      type null
    </match>

    <source>
      type systemd
      path /var/log/journal
      filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
      pos_file /fluentd-pos/kubelet.pos
      tag kubelet
      read_from_head false
    </source>

    <source>
      type tail
      path /var/log/cloud-init.log
      pos_file /fluentd-pos/fluentd-cloud-init.log.pos
      tag cloud-init
      format syslog
      read_from_head true
    </source>

    <source>
      type tail
      path /var/log/containers/*.log
      pos_file /fluentd-pos/fluentd-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      tag kubernetes.*
      format json
      read_from_head true
    </source>

    <source>
      type tail
      format json
      multiline_flush_interval 5s
      path /var/log/kube-apiserver-audit.log
      pos_file /fluentd-pos/fluentd-kube-apiserver-audit.log.pos
      tag kube-apiserver-audit
    </source>

    <source>
      type tail
      format syslog
      path /var/log/startupscript.log
      pos_file /fluentd-pos/fluentd-startupscript.log.pos
      tag startupscript
    </source>

    <source>
      type tail
      format /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/
      path /var/log/docker.log
      pos_file /fluentd-pos/fluentd-docker.log.pos
      tag docker
    </source>

    <source>
      type tail
      format none
      path /var/log/etcd.log
      pos_file /fluentd-pos/fluentd-etcd.log.pos
      tag etcd
    </source>

    <source>
      type tail
      format kubernetes
      multiline_flush_interval 5s
      path /var/log/kube-proxy.log
      pos_file /fluentd-pos/fluentd-kube-proxy.log.pos
      tag kube-proxy
    </source>

    <source>
      type tail
      format kubernetes
      multiline_flush_interval 5s
      path /var/log/kube-audit.log
      pos_file /fluentd-pos/fluentd-kube-audit.log.pos
      tag kube-audit
    </source>

    <source>
      type tail
      format none
      multiline_flush_interval 5s
      path /var/log/kern.log
      pos_file /fluentd-pos/fluentd-kern.log.pos
      tag kernel
    </source>

    <source>
      type tail
      format kubernetes
      multiline_flush_interval 5s
      path /var/log/kube-controller-manager.log
      pos_file /fluentd-pos/fluentd-kube-controller-manager.log.pos
      tag kube-controller-manager
    </source>

    <source>
      type tail
      format kubernetes
      multiline_flush_interval 5s
      path /var/log/kube-scheduler.log
      pos_file /fluentd-pos/fluentd-kube-scheduler.log.pos
      tag kube-scheduler
    </source>

    <source>
      type tail
      format kubernetes
      multiline_flush_interval 5s
      path /var/log/rescheduler.log
      pos_file /fluentd-pos/fluentd-rescheduler.log.pos
      tag rescheduler
    </source>

    <source>
      type tail
      format kubernetes
      multiline_flush_interval 5s
      path /var/log/glbc.log
      pos_file /fluentd-pos/fluentd-glbc.log.pos
      tag glbc
    </source>

    <source>
      type tail
      format kubernetes
      multiline_flush_interval 5s
      path /var/log/cluster-autoscaler.log
      pos_file /fluentd-pos/fluentd-cluster-autoscaler.log.pos
      tag cluster-autoscaler
    </source>

    # input plugin that exports metrics
    <source>
      @type prometheus
    </source>

    <source>
      @type monitor_agent
    </source>

    <source>
      @type forward
    </source>

    # input plugin that collects metrics from MonitorAgent
    <source>
      @type prometheus_monitor
      <labels>
        host ${hostname}
      </labels>
    </source>

    # input plugin that collects metrics for output plugin
    <source>
      @type prometheus_output_monitor
      <labels>
        host ${hostname}
      </labels>
    </source>

    # input plugin that collects metrics for in_tail plugin
    <source>
      @type prometheus_tail_monitor
      <labels>
        host ${hostname}
      </labels>
    </source>

    <filter kubernetes.**>
      type kubernetes_metadata
    </filter>

    <filter **>
      type record_transformer
      enable_ruby true
      <record>
        log_group_name ${((defined? record['kubernetes']) && !record['kubernetes'].nil?) ? "{{ .Values.logGroupNamePrefix }}/#{record['kubernetes']['namespace_name']}" : '{{ .Values.logGroupNamePrefix }}/.kubernetes'}
      </record>
    </filter>

    <filter **>
      type parser
      format multiline
      key_name log
      reserve_data true
      suppress_parse_error_log true
      emit_invalid_record_to_error false
      format_firstline /^(?<timestamp>[^\t]\d+.*Z)\t+(?<level>[^\t]*)\t+(?<thread>[^\t ]+)( (?<traceToken>[^\t]+))?\t+(?<classpath>[^\t]*)\t+(?<log>.*)/
      format1 /^(?<timestamp>[^\t]\d+.*Z)\t+(?<level>[^\t]*)\t+(?<thread>[^\t ]+)( (?<traceToken>[^\t]+))?\t+(?<classpath>[^\t]*)\t+(?<log>.*)/
      time_format %d/%b/%Y:%H:%M:%S %z
    </filter>

    <match kubernetes.**>
      @type detect_exceptions
      remove_tag_prefix kubernetes
      message log
      multiline_flush_interval 2
      max_lines 20
    </match>

    <match **>
      buffer_type file
      buffer_path /fluentd-pos/fluentd-cloudwatch-buffer
      type cloudwatch_logs
      auto_create_stream true
      log_group_name_key "log_group_name"
      remove_log_group_name_key true
      use_tag_as_stream true
      retention_in_days {{ default 5 .Values.retentionInDays }}
      {{- if .Values.awsTags }}
      log_group_aws_tags {{ .Values.awsTags }}
      {{- end }}
    </match>

Expected Behavior or What you need to ask

No duplicates in cloudwatch logs

Using Fluentd and CloudWatchLogs plugin versions

Kubernetes 1.16 in AWS

activesupport (5.2.3) addressable (2.6.0) aws-eventstream (1.0.3) aws-partitions (1.196.0) aws-sdk-cloudwatchlogs (1.25.0) aws-sdk-core (3.62.0, 2.10.50) aws-sigv4 (1.1.0) bigdecimal (1.2.8) concurrent-ruby (1.1.5) cool.io (1.4.6) did_you_mean (1.0.0) domain_name (0.5.20190701) ffi (1.11.1) fluent-mixin-config-placeholders (0.4.0) fluent-plugin-cloudwatch-logs (0.4.5) fluent-plugin-detect-exceptions (0.0.9) fluent-plugin-kubernetes_metadata_filter (1.0.1) fluent-plugin-prometheus (0.4.0) fluent-plugin-record-reformer (0.9.1) fluent-plugin-secure-forward (0.4.5) fluent-plugin-systemd (0.0.8) fluentd (0.12.33) http (0.9.8) http-cookie (1.0.3) http-form_data (1.0.3) http_parser.rb (0.6.0) i18n (1.6.0) io-console (0.4.5) jmespath (1.4.0) json (2.0.3, 1.8.3) kubeclient (1.1.4) lru_redux (1.1.0) mime-types (3.2.2) mime-types-data (3.2019.0331) minitest (5.9.0) msgpack (1.1.0) net-telnet (0.1.1) netrc (0.11.0) oj (2.18.5) power_assert (0.2.7) prometheus-client (0.9.0) proxifier (1.0.3) psych (2.1.0) public_suffix (3.1.1) quantile (0.2.1) rake (10.5.0) rdoc (4.2.1) recursive-open-struct (1.0.0) resolve-hostname (0.1.0) rest-client (2.0.2) sigdump (0.2.4) string-scrub (0.0.5) systemd-journal (1.4.1) test-unit (3.1.7) thread_safe (0.3.6) tzinfo (1.2.2) tzinfo-data (1.2017.2) unf (0.1.4) unf_ext (0.0.7.6) uuidtools (2.1.5) yajl-ruby (1.3.0)

cosmo0920 commented 4 years ago

Does put_log_events_retry_limit 0 and using fluent-plugin-cloudwatch-logs v0.10.1 solve this issue?

cosmo0920 commented 4 years ago

https://github.com/fluent-plugins-nursery/fluent-plugin-cloudwatch-logs/pull/199 should fix this issue. Closing.