fluent / fluentd

Fluentd: Unified Logging Layer (project under CNCF)
https://www.fluentd.org
Apache License 2.0
12.7k stars 1.33k forks source link

Objects from Fluentd in S3 contain corruption #3944

Open domokap opened 1 year ago

domokap commented 1 year ago

Describe the bug

We receive roughly 2 billion logs through our Fluentd every 24h, which are routed to Elasticsearch and also to S3 buckets.

Within the S3 plugin we have a timekey of 30s, which results in roughly 250-300k JSON objects created every 24h in our S3 buckets. Between 100-200 of these contain some sort of corruption within the object in s3 - usually non UTF-8 characters where they shouldn't be, missing characters, or jumbled strings.

To Reproduce

We have been unable to manually reproduce this on demand using fluent-cat to ingest a log which ended up with corruption in S3.

We are happy to provide more examples if required.

Expected behavior

The logs within the JSON objects in S3 contain no corruption or non UTF-8 characters.

Your Environment

- Fluentd version: 1.15.2
- Operating system: Ubuntu 20.04.2 LTS
- Kernel version: 5.8.0-1038-aws

Your Configuration

<match {logdocument.{{ team }}.dedicated.fluentd,{{ team }}.dedicated.filebeat,{{ team }}.dedicated.functionbeat,logdocument.{{ team }}.dedicated.fluentd_gcp}>
  @type copy
  @log_level info
  <store>
    @type elasticsearch
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    max_retry_putting_template 1
    request_timeout 60s
    fail_on_putting_template_retry_exceed false
    slow_flush_log_threshold {{ fluentd_slow_flush_log_threshold }}
    @id        out_es_logs-{{ team }}
    @log_level info
    log_es_400_reason true

    id_key      _hash
    remove_keys _hash

    hosts {{ elasticsearch_url }}{{ '' }}
    user "{{ es_username }}"
    password "{{ es_password[buildenv] }}"
    ca_file "{{ fluentd_conf_dir }}/{{ca_crt}}"
    ssl_version TLSv1_2
    ssl_verify false

    index_name               logs-${sky.top_tenant}-fluentd
    time_key                 time
    include_timestamp        true
    include_tag_key          true
    flatten_hashes           false
    flatten_hashes_separator _

    # Rollover index config
    rollover_index     true
    application_name   default
    index_date_pattern "now/d"
    deflector_alias    logs-${sky.top_tenant}-fluentd

    # Index template
    template_name      logs-${sky.top_tenant}-fluentd
    template_file      {{ fluentd_conf_dir }}/logs-template.json
    customize_template {"<<TAG>>":"${sky.top_tenant}"}
    template_overwrite true

    <buffer tag,sky.top_tenant>
      retry_wait {{fluentd_retry_wait | default('60')}}
      retry_exponential_backoff_base {{fluentd_retry_exponential_backoff_base | default('2')}}
      retry_type {{fluentd_retry_type | default('exponential_backoff')}}
      retry_max_interval {{fluentd_retry_max_interval | default('900')}}
      disable_chunk_backup {{fluentd_disable_chunk_backup | lower | default('true')}}
      @type file
      path {{ fluentd_data_dir }}/es-out-logs-{{ team }}

      flush_thread_count {{ fluentd_flush_thread_count }}
      flush_interval     {{fluentd_flush_interval | default('60s')}}
      flush_at_shutdown  {{fluentd_flush_at_shutdown  | lower  | default('true')}}
      overflow_action block
      chunk_limit_size {{ fluentd_chunk_limit_size }}
      # total_limit_size is set 70% of the data disk do that 1 single out can't use more than this
      total_limit_size   {{ (ansible_mounts | json_query('[?mount == `/fluentd`].size_total | [0]') * 0.7 / 1024|pow(3))  | round | int | abs }}G
      retry_forever      false
    </buffer>
  </store>
  <store ignore_error>
    @type s3
    @log_level info
    s3_bucket "{{ logstash_s3_output[buildenv].bucket }}"
    s3_region "{{ logstash_s3_output[buildenv].bucket_region }}"
    acl bucket-owner-full-control
    store_as "json"
    <format>
      @type json
    </format>
    use_server_side_encryption "{{ logstash_s3_output[buildenv].bucket_server_encryption }}"
    path ${sky.top_tenant}/${service.name}/%Y/%m/%d/
    # if you want to use ${tag} or %Y/%m/%d/ like syntax in path / s3_object_key_format,
    # need to specify tag for ${tag} and time for %Y/%m/%d in <buffer> argument.
    <buffer service.name,sky.top_tenant,time>
      retry_wait {{fluentd_retry_wait | default('60')}}
      retry_exponential_backoff_base {{fluentd_retry_exponential_backoff_base | default('2')}}
      retry_type {{fluentd_retry_type | default('exponential_backoff')}}
      retry_max_interval {{fluentd_retry_max_interval | default('900')}}
      disable_chunk_backup {{fluentd_disable_chunk_backup | lower  | default('true')}}
      @type file
      path {{ fluentd_data_dir }}/s3-out-{{ team }}
      timekey 30 # 30 seconds partition
      timekey_wait 1m
      timekey_use_utc true # use utc
      flush_thread_count {{ fluentd_flush_thread_count }}
      chunk_limit_size {{ fluentd_s3_chunk_limit_size }}
      # total_limit_size is set 70% of the data disk do that 1 single out can't use more than this
      total_limit_size   {{ (ansible_mounts | json_query('[?mount == `/fluentd`].size_total | [0]') * 0.7 / 1024|pow(3))  | round | int | abs }}G
    </buffer>
  </store>
</match>

Your Error Log

In S3:
\n�@timestamp�2022-10-15T00:16:01.074+00:00�@version\u0001�message�XMaking

Within that exact same S3 object there are plenty of examples with no corruption:
\n","@timestamp":"2022-10-15T00:16:01.094+00:00","@version":1,"message":"Response

In Elasticsearch):
"@timestamp\":\"2022-10-14T23:16:01.074+00:00\",\"@version\":1,\"message\":\"Making

Additional context

Here is the hex code for the corrupted version:

5C 6E AA 40 74 69 6D 65 73 74 61 6D 70 BD 32 30 32 32 2D 31 35 54 30 30 3A 31 36 3A 30 31 2E 30 37 34 2B 30 30 3A 30 30 A8 40 76 65 72 7369 6F 6E 5C 75 30 30 30 31 A7 6D 65 73 61 67 65 D9 58 4D 61 6B 69 6E 67
fujimotos commented 1 year ago

Here is the hex code for the corrupted version:

Your corrupted message looks like MessagePack binary to me. In particular, here is what it reads like:

5C 6E AA 40 74 69 6D 65 73 74 61 6D 70 BD 32 30 32 32 2D 31
 \  n ==  @  t  i  m  e  s  t  a  m  p ==  2  0  2  2  -  1
      fixstr (10 char)                 fixstr (29 char)

This is really strange. I cannot think of any plausible code path that can leak raw msgpack binary in this manner. Can you post your entire configuration file here?

domokap commented 1 year ago
<source>
  @type monitor_agent
  bind 0.0.0.0
  port 24220
</source>

<system>
  workers 8
  log_level info
</system>

# Beats protocol input (beats agents)
<source>
  @type beats
  metadata_as_tag
  port 5044
  bind 0.0.0.0
</source>

# Fluent protocol input
<source>
  @type forward
  tag fluentd
  port 5170
  bind 0.0.0.0
</source>

<source>
  @type forward
  port 5270
  bind 0.0.0.0
  <parse>
    @type json
  </parse>
  <transport tls>
    version TLSv1_2
    cert_path /etc/fluentd/telemetry.nbcuott.com.pem
    private_key_path /etc/fluentd/prod_telemetry.nbcuott.com.key
    private_key_passphrase ""
  </transport>
  tag fluentd_gcp
</source>

<filter **>
  @type record_transformer
  enable_ruby true
  <record>
    cloud.region ${record.dig("cloud", "region") || record['cloud.region'] || 'us-west-2'}
    cloud.provider ${record.dig("cloud", "provider") || record['cloud.provider'] || 'aws'}
    service.name ${record.dig("service", "name") || record['service.name'] || record.dig("kubernetes", "labels", "app") || record.dig('kubernetes.labels.app') || '_unnamed_service'}
    sky.top_tenant ${record.dig("sky", "top_tenant") || record['sky.top_tenant'] || 'softfail'}
    agent.type ${record.dig("agent", "type") || record['agent.type'] || 'unknown'}
      </record>
</filter>

# Output FluentD debug messages/logs to stdout (fluentd.log)
<label @FLUENT_LOG>
   <match fluent.*>
     @type stdout
     @id fluentd_logs_stdout
   </match>
</label>

<match {fluentd,filebeat,functionbeat,metricbeat,fluentd_gcp,heartbeat}>
  @type filter_list
  key_to_filter sky.top_tenant
  pattern_file_paths /etc/fluentd/tenants_list.txt
  filter_empty true
  action whitelist
  <retag>
    add_prefix tenant_in_mono
  </retag>
  <retag_filtered>
    add_prefix dedicated
  </retag_filtered>
</match>

<match {dedicated.fluentd,dedicated.fluentd_gcp,dedicated.filebeat,dedicated.functionbeat,dedicated.metricbeat,dedicated.heartbeat}>
  @type rewrite_tag_filter
  @log_level info
  <rule>
    key sky.top_tenant
    pattern sas-apm
    tag sas-apm.${tag}
  </rule>
  <rule>
    key sky.top_tenant
    pattern darp
    tag darp.${tag}
  </rule>
  <rule>
    key sky.top_tenant
    pattern stratos
    tag stratos.${tag}
  </rule>
  <rule>
    key sky.top_tenant
    pattern njp
    tag njp.${tag}
  </rule>
  <rule>
    key sky.top_tenant
    pattern blackbird
    tag blackbird.${tag}
  </rule>
  <rule>
    key sky.top_tenant
    pattern lakitu
    tag lakitu.${tag}
  </rule>
  <rule>
    key sky.top_tenant
    pattern loki
    tag loki.${tag}
  </rule>
  <rule>
    key sky.top_tenant
    pattern kratos
    tag kratos.${tag}
  </rule>
  <rule>
    key sky.top_tenant
    pattern olisipo
    tag olisipo.${tag}
  </rule>
  <rule>
    key sky.top_tenant
    pattern mytv
    tag mytv.${tag}
  </rule>
  <rule>
    key sky.top_tenant
    pattern /.+/
    tag softfail_cluster.${tag}
  </rule>
</match>

<match {*.dedicated.fluentd,*.dedicated.fluentd_gcp,tenant_in_mono.fluentd,tenant_in_mono.fluentd_gcp}>
  @type rewrite_tag_filter
  @log_level info
  <rule>
    key agent.type
    pattern metricbeat
    tag metricdocument.${tag}
  </rule>
  <rule>
    key agent.type
    pattern /.+/
    tag logdocument.${tag}
  </rule>
</match>

<match {logdocument.tenant_in_mono.fluentd,tenant_in_mono.filebeat,tenant_in_mono.functionbeat,logdocument.tenant_in_mono.fluentd_gcp}>
  @type copy
  @log_level info
  <store>
      @type elasticsearch
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    max_retry_putting_template 1
    request_timeout 60s
    fail_on_putting_template_retry_exceed false
    slow_flush_log_threshold 100.0
    @id        out_es_logs-tenant_in_mono
    @log_level info
    log_es_400_reason true

    id_key      _hash
    remove_keys _hash

    hosts ********
    user "elastic"
    password "********"
    ca_file "/etc/fluentd/ca-peacock.crt"
    ssl_version TLSv1_2
    ssl_verify false

    index_name               logs-${sky.top_tenant}-fluentd
    time_key                 time
    include_timestamp        true
    include_tag_key          true
    flatten_hashes           false
    flatten_hashes_separator _

    # Rollover index config
    rollover_index     true
    application_name   default
    index_date_pattern "now/d"
    deflector_alias    logs-${sky.top_tenant}-fluentd

    # Index template
    template_name      logs-${sky.top_tenant}-fluentd
    template_file      /etc/fluentd/logs-template.json
    customize_template {"<<TAG>>":"${sky.top_tenant}"}
    template_overwrite true

    <buffer tag,sky.top_tenant>
      retry_wait 20s
      retry_exponential_backoff_base 2
      retry_type exponential_backoff
      retry_max_interval 300s
      disable_chunk_backup true
      @type file
      path /fluentd/es-out-logs-tenant_in_mono

      flush_thread_count 8
      flush_interval     5s
      flush_at_shutdown  true
      overflow_action block
      chunk_limit_size 16M
      # total_limit_size is set 70% of the data disk do that 1 single out can't use more than this
      total_limit_size   69G
      retry_forever      false
    </buffer>
  </store>
  <store ignore_error>
    @type s3
    @log_level info
    s3_bucket "nbcu-topslogs-prod-us-west-2"
    s3_region "us-west-2"
    acl bucket-owner-full-control
    store_as "json"
    <format>
      @type json
    </format>
    use_server_side_encryption "AES256"
    path ${sky.top_tenant}/${service.name}/%Y/%m/%d/
    # if you want to use ${tag} or %Y/%m/%d/ like syntax in path / s3_object_key_format,
    # need to specify tag for ${tag} and time for %Y/%m/%d in <buffer> argument.
    <buffer service.name,sky.top_tenant,time>
      retry_wait 20s
      retry_exponential_backoff_base 2
      retry_type exponential_backoff
      retry_max_interval 300s
      disable_chunk_backup true
      @type file
      path /fluentd/s3-out-tenant_in_mono
      timekey 30 # 30 seconds partition
      timekey_wait 1m
      timekey_use_utc true # use utc
      flush_thread_count 8
      chunk_limit_size 256M
      # total_limit_size is set 70% of the data disk do that 1 single out can't use more than this
      total_limit_size   69G
    </buffer>
  </store>
</match>

The above for tenant_in_mono is repeated for every pattern of sky.top_tenant near identically so I've cut those out. I've also hashed hosts and passwords for obvious reasons.

fujimotos commented 1 year ago

@domokap I cheked your configuration. I think your configuration is basically correct.

Considering that Elasticsearch receives the same entry fine (despite corruption in S3), I suspect there is some racy bug in S3.

In S3:
\n�@timestamp�2022-10-15T00:16:01.074+00:00�@version\u0001�message�XMaking

In Elasticsearch):
"@timestamp\":\"2022-10-14T23:16:01.074+00:00\",\"@version\":1,\"message\":\"Making

I'll find some time trying to reproduce this issue. WFM.

domokap commented 1 year ago

@fujimotos thank you. I had ruled out that it is coming from upstream but I will endeavour to double check again with our logging teams.

Thanks again.

domokap commented 1 year ago

Hi @fujimotos I found some more weird related behaviour with the S3 plugin when it is creating the bucket paths. In the config we have path ${sky.top_tenant}/${service.name}/%Y/%m/%d/. Here is a summary of the folder at sky.top_tenant of service.name folders:

Note however that the only two valid service.name and therefore folder names should be mytv-service and mytv-service-recs. So the other ones must be a result of corruption(?).

Several of us suspect that it may be due to a race condition(?).

Some extra context would be that in this proposition and environment we have 42 EC2 nodes running Fluentd.

Hope this extra information helps.

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has been open 30 days with no activity. Remove stale label or comment or this issue will be closed in 7 days