fluent / fluentd

Fluentd: Unified Logging Layer (project under CNCF)
https://www.fluentd.org
Apache License 2.0
12.84k stars 1.34k forks source link

the loading to each worker are slightly different for the multi process worker feature #3346

Closed chikinchoi closed 3 years ago

chikinchoi commented 3 years ago

Describe the bug The "multi process workers" feature is not working. I have defined 2 workers in the system directive of the fluentd config. However, when I use the Grafana to check the performance of the fluentd, the fluentd_output_status_buffer_available_space_ratio metrics of each worker are slightly different. For example, worker0 is 98% and worker1 is 0%.

To Reproduce To Reproduce, please use the below fluentd config:

<system>
  @log_level debug
  workers 2
  root_dir /var/log/fluent/
</system>

<source>
  @type  forward
  @id    input1
  @log_level debug
  port  24224
  bind 0.0.0.0
</source>

# Used for docker health check
<source>
  @type http
  port 8888
  bind 0.0.0.0
</source>

# records sent for health checking won't be forwarded anywhere
<match health**>
  @type null
</match>

<filter **>
  @type string_scrub
  replace_char ?
</filter>

<filter **firelens**>
  @type concat
  key log
  multiline_start_regexp '^\{\\"@timestamp'
  multiline_end_regexp '/\}/'
  separator ""
  flush_interval 1
  timeout_label @NORMAL
</filter>

<filter **firelens**>
  @type parser
  key_name log
  reserve_data true
  emit_invalid_record_to_error false
  <parse>
  @type json
  </parse>
</filter>

<filter **firelens**>
  @type record_modifier
  <record>
    taskDef ${record["ecs_task_definition"].gsub(/:.*/, '')}
  </record>
</filter>

<filter kube**>
  @type record_modifier
  <record>
    taskDef ${record["ecs_task_definition"].gsub(/:.*/, '')}
  </record>
</filter>

<filter lambdaNode**>
  @type record_modifier
  <record>
    functionName ${record["context"]["functionName"]}
  </record>
</filter>

<filter lambdaPython**>
  @type record_modifier
  <record>
    functionName ${record["function_name"]}
  </record>
</filter>

<filter lambdaNode**>
  @type grep
  <exclude>
    key functionName
    pattern /^(?:null|)$/
  </exclude>
</filter>

<filter lambdaPython**>
  @type grep
  <exclude>
    key functionName
    pattern  /^(?:null|)$/
  </exclude>
</filter>

# Prometheus Configuration
# count number of incoming records per tag

<filter **firelens**>
  @type prometheus
  <metric>
    name fluentd_input_status_num_records_total_firelens
    type counter
    desc The total number of incoming records for firelens
    <labels>
      taskDef ${taskDef}
    </labels>
  </metric>
</filter>

<filter kube**>
  @type prometheus
  <metric>
    name fluentd_input_status_num_records_total_kube
    type counter
    desc The total number of incoming records for kubernetes
    <labels>
      taskDef ${taskDef}
    </labels>
  </metric>
</filter>

<filter lambda**>
  @type prometheus
  <metric>
    name fluentd_input_status_num_records_total_lambda
    type counter
    desc The total number of incoming records for lambda
    <labels>
      functionName ${functionName}
    </labels>
  </metric>
</filter>

<filter lambdaNode**>
  @type parser
  key_name data
  reserve_data true
  emit_invalid_record_to_error false
  <parse>
  @type json
  </parse>
</filter>

<filter lambdaPython**>
  @type parser
  key_name message
  reserve_data true
  emit_invalid_record_to_error false
  <parse>
  @type json
  </parse>
</filter>

# count number of outgoing records per tag
<match **firelens**>
  @type copy
  @id firelens
  <store>
    @type elasticsearch
    @id firelens_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    ssl_verify false
    log_es_400_reason true
    logstash_format true
    logstash_prefix ${taskDef}
    logstash_dateformat %Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    request_timeout 2147483648
    http_backend typhoeus
    sniffer_class_name "Fluent::Plugin::ElasticsearchSimpleSniffer"
    <buffer taskDef>
      @type file
      flush_mode interval
      flush_interval 5s
      flush_thread_count 16
      total_limit_size 8GB
      chunk_limit_size 80MB
      overflow_action drop_oldest_chunk
      retry_max_interval 16s
      disable_chunk_backup true
      retry_forever false
      chunk_limit_records 1000
    </buffer>
    <metadata>
     include_chunk_id true
    </metadata>
  </store>
  <store>
    @type prometheus
    @id firelens_pro
    <metric>
      name fluentd_output_status_num_records_total_firelens
      type counter
      desc The total number of outgoing records firelens
      <labels>
        taskDef ${taskDef}
      </labels>
    </metric>
  </store>
</match>

<match kube**>
  @type copy
  @id kube
  <store>
    @type elasticsearch
    @id kube_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    ssl_verify false
    log_es_400_reason true
    logstash_format true
    logstash_prefix ${taskDef}
    logstash_dateformat %Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    request_timeout 2147483648
    http_backend typhoeus
    sniffer_class_name "Fluent::Plugin::ElasticsearchSimpleSniffer"
    <buffer taskDef>
      @type file
      flush_mode interval
      flush_interval 5s
      flush_thread_count 16
      total_limit_size 512MB
      chunk_limit_size 80MB
      overflow_action drop_oldest_chunk
      retry_max_interval 16s
      disable_chunk_backup true
      retry_forever false
      chunk_limit_records 1000
    </buffer>
    <metadata>
     include_chunk_id true
    </metadata>
  </store>
  <store>
    @type prometheus
    @id kube_pro
    <metric>
      name fluentd_output_status_num_records_total_kube
      type counter
      desc The total number of outgoing records kubernetes
      <labels>
        taskDef ${taskDef}
      </labels>
    </metric>
  </store>
</match>

<match lambdaNode**>
  @type copy
  @id lambdaNode
  <store>
    @type elasticsearch
    @id lambdaNode_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    include_timestamp true
    ssl_verify false
    log_es_400_reason true
    logstash_format true
    logstash_prefix ${$.context.functionName}
    logstash_dateformat %Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    request_timeout 2147483648
    http_backend typhoeus
    sniffer_class_name "Fluent::Plugin::ElasticsearchSimpleSniffer"
    <buffer $.context.functionName>
      flush_mode interval
      flush_interval 5s
      chunk_limit_size 5MB
      flush_thread_count 16
      total_limit_size 256MB
      retry_max_interval 16s
      disable_chunk_backup true
      chunk_limit_records 1000
    </buffer>
    <metadata>
      include_chunk_id true
    </metadata>
  </store>
  <store>
    @type prometheus
    @id lambdaNode_pro
    <metric>
      name fluentd_output_status_num_records_total_lambda
      type counter
      desc The total number of outgoing records lambda
      <labels>
        functionName ${functionName}
      </labels>
    </metric>
  </store>
</match>

<match lambdaPython**>
  @type copy
  @id lambdaPython
  <store>
    @type elasticsearch
    @id lambdaPython_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    include_timestamp true
    ssl_verify false
    log_es_400_reason true
    logstash_format true
    logstash_prefix ${function_name}
    logstash_dateformat %Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    request_timeout 2147483648
    http_backend typhoeus
    sniffer_class_name "Fluent::Plugin::ElasticsearchSimpleSniffer"
    <buffer function_name>
      flush_mode interval
      flush_interval 5s
      chunk_limit_size 5MB
      flush_thread_count 16
      total_limit_size 256MB
      retry_max_interval 16s
      disable_chunk_backup true
      chunk_limit_records 1000
    </buffer>
    <metadata>
      include_chunk_id true
    </metadata>
  </store>
  <store>
    @type prometheus
    @id lambdaPython_pro
    <metric>
      name fluentd_output_status_num_records_total_lambda
      type counter
      desc The total number of outgoing records lambda
      <labels>
        functionName ${functionName}
      </labels>
    </metric>
  </store>
</match>

<label @FLUENT_LOG>
  <match fluent.*>
    @type null
  </match>
</label>

<label @NORMAL>
  <match **>
    @type null
  </match>
</label>

# expose metrics in prometheus format
<source>
  @type prometheus
  bind 0.0.0.0
  port 24231
  metrics_path /metrics
</source>

<source>
  @type prometheus_output_monitor
  interval 10
  <labels>
    hostname ${hostname}
  </labels>
</source>

Expected behavior I expect that the fluentd_output_status_buffer_available_space_ratio should be evenly as the distribution of the loading to each workers should be evenly too.

Your Environment

fujimotos commented 3 years ago

@chikinchoi Can you set the following environmental variable and see if the load balancing status improves?

$ export SERVERENGINE_USE_SOCKET_REUSEPORT=1
$ fluentd -c your-config.conf

Here is some background note:

The "multi process workers" feature is not working. ... For example, worker0 is 98% and worker1 is 0%.

This is actually a common issue among server products on Linux. Nginx has the exact same issue:

https://blog.cloudflare.com/the-sad-state-of-linux-socket-balancing/

The core problem is that Fluentd itself has no load-balancing mechanism. It just prepares a bunch of worker processes, each listening on a shared socket. When a request arrives, every worker wakes up, rashes to accept(), and whoever gets there first "wins" (and get the task as a treat).

This model works poorly on Linux, because Linux often wakes the busiest process first. So there is no load balancing. It's just that a single worker winning the game again and again, leaving other workers just slacking off.

The SERVERENGINE_USE_SOCKET_REUSEPORT mentioned above was introduced in https://github.com/treasure-data/serverengine/pull/103 to specifically resolve this issue.

This is experimental and not well documented, but it's worth a try if the above issue is bugging you.

chikinchoi commented 3 years ago

Hi @fujimotos ,

Thank you for your quick reply! Is it mean this uneven behavior is expected for the multi worker feature? For the SERVERENGINE_USE_SOCKET_REUSEPORT parameter, is it ok to add it into dockerfile? below is my dockerfile config:

FROM fluent/fluentd:v1.11.1-1.0
# Use root account to use apk
USER root
# below RUN includes plugin as examples elasticsearch is not required
# you may customize including plugins as you wish
RUN apk add --no-cache --update --virtual .build-deps \
        sudo build-base ruby-dev \
&& sudo gem install fluent-plugin-elasticsearch -v 4.2.2 \
&& sudo gem install fluent-plugin-prometheus \
&& sudo gem sources --clear-all \
&& sudo gem install elasticsearch-xpack \
&& sudo gem install fluent-plugin-record-modifier \
&& sudo gem install fluent-plugin-concat \
&& sudo gem install typhoeus \
&& sudo gem install fluent-plugin-string-scrub \
&& apk add curl \
&& apk del .build-deps \
&& rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems/*/cache/*.gem
COPY fluent.conf /fluentd/etc/
RUN mkdir /var/log/fluent
RUN chmod -R 777 /var/log/fluent
RUN chown -R fluent /var/log/fluent
RUN sniffer=$(gem contents fluent-plugin-elasticsearch|grep elasticsearch_simple_sniffer.rb ); \
echo $sniffer
# fluentd -c /fluentd/etc/fluent.conf -r $sniffer;
COPY entrypoint.sh /bin/
RUN chmod +x /bin/entrypoint.sh

# USER fluent
fujimotos commented 3 years ago

Is it mean this uneven behavior is expected for the multi worker feature?

@chibicode Right. The uneven worker load is an open issue on Linux.

One proposed solution is SERVERENGINE_USE_SOCKET_REUSEPORT. It's promising, but still being in the experimental stage. So we haven't yet enabled the feature by default.

For the SERVERENGINE_USE_SOCKET_REUSEPORT parameter, is it ok to add it into dockerfile?

In your use case, I think the best point to set the env is /bin/entrypoint.sh. Add the export line just before the main program invocation.

Here is an example:

#!/bin/bash
export SERVERENGINE_USE_SOCKET_REUSEPORT=1
fluentd -c /fluentd/etc/fluent.conf
chikinchoi commented 3 years ago

Hi @fujimotos ,

Thank you for your replying. I am testing to add "SERVERENGINE_USE_SOCKET_REUSEPORT" to the entrypoint.sh and will let your know the result once done.

Right. The uneven worker load is an open issue on Linux.

for the uneven worker load issue, I read the fluentd document and saw that there is a "worker N-M directive". may I know what is the purpose of the "worker N-M" if the uneven worker load issue is expected behavior? Thank you very much!

chikinchoi commented 3 years ago

@fujimotos , I have added "SERVERENGINE_USE_SOCKET_REUSEPORT" in entrypoint.sh as the below script but found that the loading to each worker still has different. The fluentd_output_status_buffer_available_space_ratio of worker0 is 92.7% and worker1 is 99.2%. Is this difference expected? Also, how can I verify if the "SERVERENGINE_USE_SOCKET_REUSEPORT" variable is working?

#!/bin/sh

#source vars if file exists
DEFAULT=/etc/default/fluentd

export SERVERENGINE_USE_SOCKET_REUSEPORT=1

if [ -r $DEFAULT ]; then
    set -o allexport
    . $DEFAULT
    set +o allexport
fi

# If the user has supplied only arguments append them to `fluentd` command
if [ "${1#-}" != "$1" ]; then
    set -- fluentd "$@"
fi

# If user does not supply config file or plugins, use the default
if [ "$1" = "fluentd" ]; then
    if ! echo $@ | grep ' \-c' ; then
       set -- "$@" -c /fluentd/etc/${FLUENTD_CONF}
    fi

    if ! echo $@ | grep ' \-p' ; then
       set -- "$@" -p /fluentd/plugins
    fi

    set -- "$@" -r /usr/lib/ruby/gems/2.5.0/gems/fluent-plugin-elasticsearch-4.2.2/lib/fluent/plugin/elasticsearch_simple_sniffer.rb
fi

df -h
echo $@
echo $SERVERENGINE_USE_SOCKET_REUSEPORT
exec "$@"
fujimotos commented 3 years ago

The fluentd_output_status_buffer_available_space_ratio of worker0 is 92.7% and worker1 is 99.2%. Is this difference expected?

@chikinchoi I think a small difference is expected.

You originally reported that the space usage (fluentd_output_status_buffer_available_space_ratio) was:

worker0 is 98% and worker1 is 0%.

So worker1 was obviously overworking. On the other hand, the current status is:

worker0 is 92.7% and worker1 is 99.2%.

so I consider this as a progress, better than 0% vs 98% usage.

chikinchoi commented 3 years ago

@fujimotos I found worker0 is 71% and worker1 is 0% today, seems it is still a progress, but do you think there is any way to make it better?

fujimotos commented 3 years ago

do you think there is any way to make it better?

@chikinchoi As far as I know, there is no other option that can improve the task distribution.

Edit: There is a fix being proposed in the Linux kernel level. But the kernel maintainers are not convinced by that patch.

So I believe SERVERENGINE_USE_SOCKET_REUSEPORT is currenlty the best Fluentd can archive to distribute the task load evenly.

ankit21491 commented 1 year ago

Thanks for the resolution, I have tried using it but after making the change in the "export SERVERENGINE_USE_SOCKET_REUSEPORT=1", the other workers (I am using 6 worker node in my configuration) started utilizing CPU for a very short period of time, ~2 minutes and after that everything reverted back as earlier.

Also I am sending the logs to NewRelic using Fluentd, and for most of the server/cluster it is working fine but for few of them it is showing lags from 2 hours and goes even beyond 48 hours.

Suprisingly the logs for one of the namespace I have in my K8s cluster streaming live in the NewRelic however for one of the namespace I am facing this issue. I have tried using directive as well as the solution provided above that reduced the latency from hours to somewhat close to 10-15 minutes but I am still not getting the logs without lag.

Any troubleshooting step would be appreciated.

jvs87 commented 3 months ago

Im facing with the same problem, any other solution aditional to SERVERENGINE_USE_SOCKET_REUSEPORT ?

daipom commented 3 months ago

So, the load is unbalanced even if setting SERVERENGINE_USE_SOCKET_REUSEPORT? How much difference does it make?

jvs87 commented 3 months ago

So much diference, this is a picture of buffer from yesterdey:

image

As you can see worker 1 buffer its increasing and the other are "emtpy".

Thanks.

daipom commented 3 months ago

@jvs87 Thanks! Does this occur even if setting SERVERENGINE_USE_SOCKET_REUSEPORT?

jvs87 commented 3 months ago

Yes, it is declared in the env:

image
daipom commented 3 months ago

Thanks. I see... I am surprised to see so much imbalance, even with reuseport. When I applied reuseport on nginx, the load was more distributed. We may need to investigate the cause.

Note: https://github.com/uken/fluent-plugin-elasticsearch/issues/1047

jvs87 commented 3 months ago

Yes, I'm a little blinded and dont know if the problem is related to multi process or in the other hand to bad use of buffer.

jvs87 commented 2 months ago

Hi. Do you need any other test?