fluent-plugins-nursery / fluent-plugin-concat

Fluentd Filter plugin to concatenate multiline log separated in multiple events.
MIT License
108 stars 33 forks source link

timeout flush and loss log #94

Open majun9129 opened 4 years ago

majun9129 commented 4 years ago

Problem

I'm using fluentd collect the kubernetes log. Refresh timeouts can occur when the collection of some applications generates fewer logs with larger intervals. When a refresh timeout occurred, I found that the timed out log did not collect ElasticSearch. Because running program logs print differently, there is no way to configure multi-line matching end expressions. I configured the timeout labels. However, I found that only the warning was no longer prompted, but the logs were still not collected. How to deal with the problem?

Steps to replicate

fluentd.conf

<source>
  @id in_tail_container_logs
  @type tail
  path /var/log/containers/*.log
  pos_file /var/log/fluentd-kafka-containers.log.pos
  tag kubernetes.*
  read_from_head true
  <parse>
    @type json
    time_format %Y-%m-%dT%H:%M:%S.%NZ
  </parse>
</source>
<filter kubernetes.**>
  @id filter_concat
  @type concat
  key log
  multiline_start_regexp /^\d{4}-\d{1,2}-\d{1,2}[T\s]\d{1,2}:\d{1,2}:\d{1,2}\.\d{3}/
  separator ""
  timeout_label @NORMAL
</filter>
<filter kubernetes.**>
  @type parser
  @id filter_parser
  key_name log
  reserve_data true
  <parse>
    @type regexp
    expression /^(?<times>\d{4}-\d{1,2}-\d{1,2}[T\s]\d{1,2}:\d{1,2}:\d{1,2}\.\d{3})\s+(?<level>[^\s]+)\s+(?<host>[^\s]+)\s+(?<logger>[^\s]+)\s+(?<thread>[^\s]+)\s+(?<message>.*)$/m
  </parse>
</filter>
# Enriches records with Kubernetes metadata
<filter kubernetes.**>
  @id filter_kubernetes_metadata
  @type kubernetes_metadata
  verify_ssl false
</filter>
<match kubernetes.**>
  @type relabel
  @label @NORMAL
</match>
<label @NORMAL>
  <match kubernetes.**>
    @type kafka_buffered
    @id out_kafka
    brokers "#{ENV['FLUENT_KAFKA_BROKERS']}"
    flush_interval 5s
    default_topic "#{ENV['FLUENT_KAFKA_DEFAULT_TOPIC'] || nil}"
    default_partition_key "#{ENV['FLUENT_KAFKA_DEFAULT_PARTITION_KEY'] || nil}"
    default_message_key "#{ENV['FLUENT_KAFKA_DEFAULT_MESSAGE_KEY'] || nil}"
    output_data_type "#{ENV['FLUENT_KAFKA_OUTPUT_DATA_TYPE'] || 'json'}"
    output_include_tag "#{ENV['FLUENT_KAFKA_OUTPUT_INCLUDE_TAG'] || false}"
    output_include_time "#{ENV['FLUENT_KAFKA_OUTPUT_INCLUDE_TIME'] || false}"
    exclude_topic_key "#{ENV['FLUENT_KAFKA_EXCLUDE_TOPIC_KEY'] || false}"
    exclude_partition_key "#{ENV['FLUENT_KAFKA_EXCLUDE_PARTITION_KEY'] || false}"
    get_kafka_client_log "#{ENV['FLUENT_KAFKA_GET_KAFKA_CLIENT_LOG'] || false}"
    # ruby-kafka producer options
    max_send_retries "#{ENV['FLUENT_KAFKA_MAX_SEND_RETRIES'] || 3}"
    required_acks "#{ENV['FLUENT_KAFKA_REQUIRED_ACKS'] || -1}"
    ack_timeout "#{ENV['FLUENT_KAFKA_ACK_TIMEOUT'] || nil}"
    compression_codec "#{ENV['FLUENT_KAFKA_COMPRESSION_CODEC'] || nil}"
    max_send_limit_bytes "#{ENV['FLUENT_KAFKA_MAX_SEND_LIMIT_BYTES'] || nil}"
    discard_kafka_delivery_failed "# 
    {ENV['FLUENT_KAFKA_DISCARD_KAFKA_DELIVERY_FAILED'] || false}"
    slow_flush_log_threshold "180"
    buffer_chunk_limit 32m
    buffer_queue_limit 64
    flush_interval 10s
    flush_thread_count 4
  </match>
</label>

Expected Behavior

Log sample: 【1】{"log":"2020-09-28 07:29:03.795 INFO history-6764859469-wmdpc RabbitMQ.Consumer [11] RabbitMQ consumer in waiting message 00:26:00.1899811. \n","stream":"stdout","time":"2020-09-28T07:29:03.795661554Z"} 【2】{"log":"2020-09-28 07:29:03.806 INFO history-6764859469-wmdpc RabbitMQ.Declarer [8] RabbitMQ connection is open. \n","stream":"stdout","time":"2020-09-28T07:29:03.806376234Z"} 【3】{"log":"2020-09-28 07:31:03.796 INFO history-6764859469-wmdpc RabbitMQ.Consumer [11] RabbitMQ consumer in waiting message 00:28:00.1911350. \n","stream":"stdout","time":"2020-09-28T07:31:03.796778325Z"} 【4】{"log":"2020-09-28 07:31:03.807 INFO history-6764859469-wmdpc RabbitMQ.Declarer [8] RabbitMQ connection is open. \n","stream":"stdout","time":"2020-09-28T07:31:03.808164692Z"}

The above four logs, according to my configuration files, the first normally collected, the second with the third between the interval of about 1 minutes, so will produce a timeout problem, the second log cannot be collected, similarly, article 3 of the collected can be normal, but if also timeout of article 4, article 4 log also cannot be collected.

Your environment

fluentd images: fluent/fluentd-kubernetes-daemonset:v1.7.4-debian-kafka-2.2 fluentd plugin:

LOCAL GEMS

addressable (2.7.0) bigdecimal (default: 1.4.1) bundler (default: 1.17.2, 1.16.2) cmath (default: 1.0.0) concurrent-ruby (1.1.5) cool.io (1.5.4) csv (default: 3.0.9) date (default: 2.0.0) dbm (default: 1.0.0) dig_rb (1.0.1) domain_name (0.5.20190701) e2mmap (default: 0.1.0) etc (default: 1.0.1) fcntl (default: 1.0.0) ffi (1.11.3) ffi-compiler (1.0.1) fiddle (default: 1.0.0) fileutils (default: 1.1.0) fluent-config-regexp-type (1.0.0) fluent-plugin-concat (2.4.0) fluent-plugin-detect-exceptions (0.0.13) fluent-plugin-grok-parser (2.6.1) fluent-plugin-json-in-json-2 (1.0.2) fluent-plugin-kafka (0.7.9) fluent-plugin-kubernetes_metadata_filter (2.3.0) fluent-plugin-multi-format-parser (1.0.0) fluent-plugin-prometheus (1.6.1) fluent-plugin-record-modifier (2.0.1) fluent-plugin-rewrite-tag-filter (2.2.0) fluent-plugin-systemd (1.0.2) fluentd (1.7.4) forwardable (default: 1.2.0) gdbm (default: 2.0.0) http (4.2.0) http-accept (1.7.0) http-cookie (1.0.3) http-form_data (2.1.1) http-parser (1.2.1) http_parser.rb (0.6.0) io-console (default: 0.4.7) ipaddr (default: 1.2.2) irb (default: 1.0.0) json (default: 2.1.0) kubeclient (4.5.0) logger (default: 1.3.0) lru_redux (1.1.0) ltsv (0.1.2) matrix (default: 0.1.0) mime-types (3.3) mime-types-data (3.2019.1009) msgpack (1.3.1) mutex_m (default: 0.1.0) netrc (0.11.0) oj (3.8.1) openssl (default: 2.1.2) ostruct (default: 0.1.0) prime (default: 0.1.0) prometheus-client (0.9.0) psych (default: 3.1.0) public_suffix (4.0.1) quantile (0.2.1) rake (13.0.1) rdoc (default: 6.1.2) recursive-open-struct (1.1.0) rest-client (2.1.0) rexml (default: 3.1.9) rss (default: 0.2.7) ruby-kafka (0.6.8) scanf (default: 1.0.0) sdbm (default: 1.0.0) serverengine (2.2.0) shell (default: 0.7) sigdump (0.2.4) snappy (0.0.17) stringio (default: 0.0.2) strptime (0.2.3) strscan (default: 1.0.0) sync (default: 0.5.0) systemd-journal (1.3.3) thwait (default: 0.1.0) tracer (default: 0.1.0) tzinfo (2.0.0) tzinfo-data (1.2019.3) unf (0.1.4) unf_ext (0.0.7.6) webrick (default: 1.4.2) yajl-ruby (1.4.1) zlib (default: 1.0.0)

Please help. Thanks.

ctretyak commented 3 years ago

I have the same problem. Example from README doesn't work.

<filter **>
  @type concat
  key message
  multiline_start_regexp /^Start/
  flush_interval 5
  timeout_label @NORMAL
</filter>

<match **>
  @type relabel
  @label @NORMAL
</match>

<label @NORMAL>
  <match **>
    @type stdout
  </match>
</label>

It's suppress warn about timeout, but flushed logs don't come to Elastic

niltonvasques commented 3 years ago

Same problem here... It happens after some days, and then suddenly the fluentd is not sending logs anymore to my elastic. Here is my settings:

My logs stopps to be processed and the only error log that I found was this timeout issue, happening immediatelly after the last processed log.

2021-06-15 20:30:08 +0000 [error]: #0 failed to flush timeout buffer error_class=ArgumentError error="@MULTILINE_LOGS label not found"

My settings:

# ...

<filter multiline.**>
   @type concat
   key message
   multiline_start_regexp /^(E|F), \[[^ ]+ \#\d+\]( )+[^ ]+ -- :/
   continuous_line_regexp /^(?!(^., \[[^ ]+ \#\d+\]( )+[^ ]+ -- :)).+/
   flush_interval 60
   timeout_label @MULTILINE_LOGS
</filter>

<filter multiline.**>
  @type concat
  key message
  stream_identity_key request_id
  multiline_start_regexp /^I, \[.+\]  INFO -- : \[.+\] Started.+for.+at.+/
  multiline_end_regexp /^I, \[.+\]  INFO -- : \[.+\] Completed.+/
  flush_interval 60
  timeout_label @MULTILINE_LOGS
</filter>

# ...

<label @SYSTEM>
  <match *.**>
    @type copy

    <store>
      @type elasticsearch
      host ...
      port ...
      logstash_format true
      logstash_prefix fluentd
      logstash_dateformat %Y%m%d
      include_tag_key true
      type_name access_log
      tag_key @log_name
      flush_interval 1s
      reload_connections false
      reconnect_on_error true
      reload_on_failure true
      @log_level debug
      <buffer>
         @type memory
         flush_mode interval
         retry_type exponential_backoff
         flush_thread_count 2
         flush_interval 5s
         retry_forever true
         retry_max_interval 15
         chunk_limit_size 2M
         queue_limit_length 32
         overflow_action block
      </buffer>
    </store>

    <store>
      @type stdout
    </store>
  </match>
</label>
niltonvasques commented 3 years ago

I performed some changes in order to capture the timeout log, but after 5 days the same issue happened again. These was the changes:

last log
2021-06-22 18:09:36 +0000 [info]: #0 Timeout flush: multiline.:d41c1185-cbba-40c7-9d77-12449587539a
Changes
# ...

<filter multiline.**>
   @type concat
   # ....
   timeout_label @SYSTEM
</filter>

<filter multiline.**>
  @type concat
  # ....
  timeout_label @SYSTEM
</filter>

# ...

<label @SYSTEM>
  <match *.**>
    @type copy

    <store>
        # elastic ...
    </store>

    <store>
      @type stdout
    </store>
  </match>
</label>
satrushn commented 3 years ago

I also tried to use labels and I've got errors in fluentd log like:

[error]: #0 failed to flush timeout buffer error_class=ArgumentError error="@NORMAL label not found"

And there is no last log line from the multiline batch of one event! What to do?