fluent / fluentd

Fluentd: Unified Logging Layer (project under CNCF)
https://www.fluentd.org
Apache License 2.0
12.88k stars 1.34k forks source link

fluentd v1 CPU usage is higher than v0.12. #1801

Closed wyukawa closed 6 years ago

wyukawa commented 6 years ago

Check CONTRIBUTING guideline first and here is the list to help us investigate the problem.

gemfile

...
gem "fluentd", "1.0.2"

gem "fluent-plugin-config-expander", "1.0.1"
gem "fluent-plugin-record-modifier", "1.0.1"
gem "fluent-plugin-suppress", "0.0.7"
gem "fluent-plugin-prometheus", "1.0.0"

conf

  <source>
    @type forward
    port ...
  </source>
  <source>
    @type monitor_agent
    port ...
  </source>
  <source>
    @type prometheus
    port ...
  </source>
  <source>
    @type prometheus_monitor
    <labels>
      host ${hostname}
      name ...
    </labels>
  </source>
  <match ...}>
    @type null
  </match>
  <match ...>
    @type null
  </match>
  <match {...}.**>
    @type config_expander
    <config>
      @type forward
      @id ...
      flush_mode immediate
      buffer_chunk_limit 4M
      heartbeat_type transport
      buffer_queue_limit 1024
      num_threads 30
      <for node in ...>
        <server>
          host ${node}
          port 20225
        </server>
      </for>
    </config>
  </match>
  <match failed>
    @type file
    path ...
    <buffer time>
      path ...
    </buffer>
  </match>
  <match fluent.**>
    @type record_modifier
    tag "fluentd.raw"
    include_tag_key yes
    tag_key loglevel
    hostname ...
    <inject>
      tag_key loglevel
    </inject>
  </match>
  <match fluentd.raw>
    @type suppress
    interval 15
    num 1
    attr_keys "hostname,portnum,loglevel,message"
    remove_tag_suffix .raw
  </match>
    <match fluentd>
    @type forward
    flush_interval 5s
    heartbeat_type transport
    <server>
      host "..."
      port ...
    </server>
    <buffer tag>
      flush_interval 5s
    </buffer>
  </match>

If you have any comments, it would be nice.

data flow is here kafka -> kafka-fluentd-consumer -> this fluend -> fluent-plugin-elasticsearch -> elasticsearch

fluentd v1 stackprof

$ stackprof /tmp/fluent-stackprof.dump --text
==================================
  Mode: cpu(1000)
  Samples: 22047 (0.33% miss rate)
  GC: 6049 (27.44%)
==================================
     TOTAL    (pct)     SAMPLES    (pct)     FRAME
     13783  (62.5%)       13777  (62.5%)     Fluent::MessagePackEventStream#ensure_unpacked!
      6049  (27.4%)        6049  (27.4%)     (garbage collection)
       642   (2.9%)         626   (2.8%)     Fluent::Plugin::ForwardOutput::Node#send_data_actual
     14608  (66.3%)         498   (2.3%)     Fluent::Plugin::ForwardInput#read_messages
       331   (1.5%)         331   (1.5%)     IO#read_nonblock
       187   (0.8%)         186   (0.8%)     Fluent::Plugin::Buffer::MemoryChunk#concat
     15134  (68.6%)         129   (0.6%)     Coolio::Loop#run
        62   (0.3%)          54   (0.2%)     Fluent::Plugin::Buffer#purge_chunk
        37   (0.2%)          35   (0.2%)     Fluent::Plugin::Buffer#add_metadata
        43   (0.2%)          26   (0.1%)     Fluent::PluginHelper::Socket#socket_create_tcp
        30   (0.1%)          24   (0.1%)     Fluent::Plugin::ForwardOutput::Node#heartbeat
        18   (0.1%)          18   (0.1%)     Fluent::Plugin::ForwardOutput::FailureDetector#phi
        16   (0.1%)          16   (0.1%)     MonitorMixin#mon_enter
        15   (0.1%)          15   (0.1%)     #<Module:0x0000563e68c6f208>.pack
       840   (3.8%)          13   (0.1%)     Fluent::Plugin::Output#flush_thread_run
        11   (0.0%)          11   (0.0%)     Fluent::Plugin::OwnedByMixin#log
        11   (0.0%)          11   (0.0%)     Fluent::PluginHelper::SocketOption#socket_option_set_one
        10   (0.0%)          10   (0.0%)     Coolio::IO#disable_write_watcher
        10   (0.0%)          10   (0.0%)     Fluent::Plugin::Buffer::MemoryChunk#rollback
        10   (0.0%)          10   (0.0%)     #<Module:0x0000563e68c1e218>.now_raw
        10   (0.0%)          10   (0.0%)     Fluent::Log#trace
     13846  (62.8%)           9   (0.0%)     #<Fluent::PluginHelper::Server::EventHandler::TCPServer:0x0000563e68cfacb8>.on_read_without_connection
        21   (0.1%)           9   (0.0%)     Fluent::Plugin::Buffer#dequeue_chunk
         9   (0.0%)           9   (0.0%)     Coolio::IO#enable_write_watcher
         9   (0.0%)           9   (0.0%)     #<Module:0x0000563e6938c7c0>.generate
     13948  (63.3%)           7   (0.0%)     Fluent::Plugin::Output#handle_stream_simple
        18   (0.1%)           7   (0.0%)     WEBrick::HTTPServer#run
         9   (0.0%)           6   (0.0%)     Fluent::Plugin::Buffer#enqueue_chunk
         6   (0.0%)           6   (0.0%)     Fluent::Plugin::ForwardOutput::FailureDetector#add
       807   (3.7%)           5   (0.0%)     Fluent::Plugin::Output#try_flush
$ stackprof /tmp/fluent-stackprof.dump --method MessagePackEventStream#ensure_unpacked!
Fluent::MessagePackEventStream#ensure_unpacked! (/.../local/ruby-2.4/lib/ruby/gems/2.4.0/gems/fluentd-1.0.2/lib/fluent/event.rb:233)
  samples:  13777 self (62.5%)  /   13783 total (62.5%)
  callers:
    13783  (  100.0%)  Fluent::MessagePackEventStream#size
    13777  (  100.0%)  Fluent::MessagePackEventStream#ensure_unpacked!
  callees (6 total):
    13777  (229616.7%)  Fluent::MessagePackEventStream#ensure_unpacked!
       6  (  100.0%)  Fluent::MessagePackFactory::Mixin#msgpack_unpacker
  code:
                                  |   233  |     def ensure_unpacked!
                                  |   234  |       return if @unpacked_times && @unpacked_records
                                  |   235  |       @unpacked_times = []
                                  |   236  |       @unpacked_records = []
 13783   (62.5%)                   |   237  |       msgpack_unpacker.feed_each(@data) do |time, record|
                                  |   238  |         @unpacked_times << time
 13777   (62.5%) /  13777  (62.5%)  |   239  |         @unpacked_records << record
                                  |   240  |       end

v0.12 environemnt is the following

CentOS 6.6 Ruby 2.1

gemfile

...
gem "msgpack"
gem "fluentd", "0.12.39"

gem "fluent-plugin-config-expander", "0.2.1"
gem "fluent-plugin-ping-message", "0.2.1"
gem "fluent-plugin-record-modifier", "0.6.0"
gem "fluent-plugin-suppress", "0.0.7"
gem "fluent-plugin-prometheus", "0.3.0"

conf

<source>
  type forward
  port ...
</source>

<source>
  type monitor_agent
  port ...
</source>

<source>
  @type prometheus
  port ...
</source>

<source>
  @type prometheus_monitor
  <labels>
     host ${hostname}
     name ...
  </labels>
</source>

<match ...>
   type null
</match>

<match ...>
   type null
</match>

<match ...>
   type config_expander
   <config>
     type forward
     id ...
     flush_interval 1s
     buffer_chunk_limit 4M
     heartbeat_type tcp
     buffer_queue_limit 1024
     num_threads 30
     <for node in ...>
       <for port in 20225 20226 20227 20228 20229 20230 20231 20232>
         <server>
           host "${node}"
           port ${port}
         </server>
       </for>
     </for>
   </config>
</match>

<match ...>
   type config_expander
   <config>
     type forward
     id ...
     flush_interval 1s
     buffer_chunk_limit 4M
     heartbeat_type tcp
     buffer_queue_limit 1024
     num_threads 30
     <for node in ...>
       <for port in 20225 20226 20227 20228 20229 20230 20231 20232>
         <server>
           host "${node}"
           port ${port}
         </server>
       </for>
     </for>
   </config>
</match>

# Error record in fluentd-consumer
<match failed>
  type file
  path ...
</match>

<match ping.**>
  type forward
  num_threads 30
  flush_interval 1s
  buffer_chunk_limit 16M
  buffer_queue_limit 256 # 16MB * 256 -> 4GB
  heartbeat_type tcp
  id ping_message
  <server>
    host "..."
  </server>
  <server>
    host "..."
    standby yes
  </server>
</match>

<source>
  type ping_message
  tag "ping"
  data "..."
</source>

<match fluent.**>
  type record_modifier
  tag "fluentd.raw"
  include_tag_key yes
  tag_key "loglevel"
  hostname "..."
</match>
<match fluentd.raw>
  type suppress
  interval 15
  num 1
  attr_keys "hostname,portnum,loglevel,message"
  remove_tag_suffix ".raw"
</match>
<match fluentd>
  type forward
  flush_interval 5s
  heartbeat_type tcp
  <server>
    host "..."
    port 24225
  </server>
</match>
repeatedly commented 6 years ago

Thanks for the report. I didn't check yet deeply but I assume fluency(in kafka-fluentd-consumer) doesn't set size option in the payload. It seems to cause ensure_unpacked call in size for buffer chunk. I will test the performance with/without size option in forward protocol.

repeatedly commented 6 years ago

I tested dummy plugin with/without size. With size, CPU usage is 1%. Without size, CPU usage is 4 - 5%. If the message body is large, it has more impact. So support size in fluency, may resolve the problem.

https://github.com/komamitsu/fluency/issues/85

repeatedly commented 6 years ago

Release kafka-fluentd-consumer v0.3.2: https://github.com/treasure-data/kafka-fluentd-consumer/releases/tag/v0.3.2 This problem should be resolved.