fluent / fluentd

Fluentd: Unified Logging Layer (project under CNCF)
https://www.fluentd.org
Apache License 2.0
12.87k stars 1.34k forks source link

Stage buffer sometimes sticks around and doesn't ever get queued #4662

Open stanhu opened 1 week ago

stanhu commented 1 week ago

Describe the bug

I've been trying to track down what looks like a memory leak for the last week where a stage buffer doesn't get cleared out even though new data arrives. In my latest attempt to isolate the problem, I noticed a jump to 8 MB in the fluentd_output_status_buffer_stage_byte_size Prometheus metric, which measures the total bytes of the stage queue:

image

This jump appears to persist indefinitely until I restart fluentd.

To Reproduce

I'm still working on this.

Expected behavior

No memory growth over time.

Your Environment

- Fluentd version: v1.16.5
- Package version: 5.0.4-1
- Operating system: Ubuntu 20.04.6
- Kernel version: 5.15.0-1051-gcp

Your Configuration

I don't have a clear reproduction step yet. Our config looks something like this:

<source>
  @type tail
  tag postgres.postgres
  path /var/log/postgresql/postgresql.log
  pos_file /var/log/fluent/postgres.log.pos
  format /(?<time>[^G]*) GMT \[(?<pg_id>\d+), (?<xid>\d+)\]: .* user=(?<pg_user>[^,]*),db=(?<pg_db>[^,]*),app=(?<pg_application>[^,]*),client=(?<pg_client>[^ ]*) (?<pg_message>.*)/
  time_format %Y-%m-%d %H:%M:%S.%N
</source>

<filter postgres.postgres_csv>
  @type postgresql_slowlog
</filter>

<filter postgres.postgres_csv>
  @type postgresql_redactor
  max_length 200000
</filter>

<match postgres.*>
  @type copy
  <store>
    @type google_cloud
    label_map {
      "tag": "tag"
    }
    buffer_type file
    buffer_path /opt/fluent/buffers/postgres/google_cloud
    buffer_chunk_limit 8MB
    buffer_queue_limit 1000
    flush_interval 30s
    log_level info
  </store>

  <store>
    @type cloud_pubsub
    topic pubsub-postgres-inf-gprd
    project my-project
    buffer_type file
    buffer_path /opt/fluent/buffers/postgres/cloud_pubsub
    buffer_chunk_limit 8MB
    buffer_queue_limit 1000
    flush_interval 30s
  </store>
</match>

Your Error Log

The stuck 8MB buffer seems to have coincided with an EOF error:

  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:170:in `open'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/event.rb:318:in `each'
  2024-10-08 10:40:21 +0000 [error]: #0 /etc/fluent/plugin/out_cloud_pubsub.rb:62:in `write'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/compat/output.rb:131:in `write'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1225:in `try_flush'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-10-08 10:40:21 +0000 [error]: #0 failed to purge buffer chunk chunk_id="623f4c358bd4b7cd7f63a4eb7410b459" error_class=Errno::ENOENT error=#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b623f4c358bd4b7cd7f63a4eb7410b459.log>
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:161:in `unlink'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:161:in `purge'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer.rb:601:in `block in purge_chunk'
  2024-10-08 10:40:21 +0000 [error]: #0 /opt/fluent/lib/ruby/3.2.0/monitor.rb:202:in `synchronize'
  2024-10-08 10:40:21 +0000 [error]: #0 /opt/fluent/lib/ruby/3.2.0/monitor.rb:202:in `mon_synchronize'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer.rb:592:in `purge_chunk'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1110:in `commit_write'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1229:in `try_flush'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-10-08 10:40:21.470273004 +0000 fluent.error: {"chunk_id":"623f4c358bd4b7cd7f63a4eb7410b459","error_class":"Errno::ENOENT","error":"#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b623f4c358bd4b7cd7f63a4eb7410b459.log>","message":"failed to purge buffer chunk chunk_id=\"623f4c358bd4b7cd7f63a4eb7410b459\" error_class=Errno::ENOENT error=#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b623f4c358bd4b7cd7f63a4eb7410b459.log>","tag":"fluent.error","environment":"gprd","hostname":"example.com","fqdn":"example.com","stage":"main","shard":"backup","tier":"db","type":"patroni"}

Additional context

Note that previously when log messages were up to 3 MB, I would see more of these "step" jumps in memory usage. I've altered our filters to truncate the log messages to 200K, which seems to have stopped most of these stage buffer leaks. But I'm still wondering if there is a corner case here where the file buffer got cleared but the stage buffer did not.

stanhu commented 1 day ago

I saw this error message again today:

2024-10-15 16:50:28 +0000 [error]: #0 unexpected error error="closed stream"
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:170:in `seek'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:170:in `open'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/event.rb:318:in `each'
  2024-10-15 16:50:28 +0000 [error]: #0 /etc/fluent/plugin/out_cloud_pubsub.rb:62:in `write'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/compat/output.rb:131:in `write'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1225:in `try_flush'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-10-15 16:50:28.302744238 +0000 fluent.error: {"error":"closed stream","message":"unexpected error error=\"closed stream\"","tag":"fluent.error","environment":"gprd","hostname":"patroni-main-v14-02-db-gprd","fqdn":"patroni-main-v14-02-db-gprd.c.gitlab-production.internal","stage":"main","shard":"backup","tier":"db","type":"patroni"}
2024-10-15 16:50:28 +0000 [error]: #0 failed to purge buffer chunk chunk_id="62486bfe620a478fb7b97c5b27980db8" error_class=Errno::ENOENT error=#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b62486bfe620a478fb7b97c5b27980db8.log>
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:161:in `unlink'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:161:in `purge'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer.rb:601:in `block in purge_chunk'
  2024-10-15 16:50:28 +0000 [error]: #0 /opt/fluent/lib/ruby/3.2.0/monitor.rb:202:in `synchronize'
  2024-10-15 16:50:28 +0000 [error]: #0 /opt/fluent/lib/ruby/3.2.0/monitor.rb:202:in `mon_synchronize'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer.rb:592:in `purge_chunk'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1110:in `commit_write'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1229:in `try_flush'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-10-15 16:50:28.791864146 +0000 fluent.error: {"chunk_id":"62486bfe620a478fb7b97c5b27980db8","error_class":"Errno::ENOENT","error":"#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b62486bfe620a478fb7b97c5b27980db8.log>","message":"failed to purge buffer chunk chunk_id=\"62486bfe620a478fb7b97c5b27980db8\" error_class=Errno::ENOENT error=#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b62486bfe620a478fb7b97c5b27980db8.log>","tag":"fluent.error","environment":"gprd","hostname":"patroni-main-v14-02-db-gprd","fqdn":"patroni-main-v14-02-db-gprd.c.gitlab-production.internal","stage":"main","shard":"backup","tier":"db","type":"patroni"}

In this example, I suspect:

  1. The file buffer stream closed for some reason, resulting in the closed stream error. Even if the file were deleted, I'm not sure why this would result in the closed stream error unless this pertained to the upstream PubSub service.
  2. The buffer attempted to be purged, but this resulted in the ENOENT error.
  3. As a result, the ENOENT prevented the metrics from updating:

https://github.com/fluent/fluentd/blob/403a28ff8d74adfeefb9842c8342292397ba84b7/lib/fluent/plugin/buffer.rb#L602

I don't know why this closed stream error happens infrequently, but I wonder:

  1. Should ENOENT be ignored in Buffer::FileChunk#purge?
  2. Should Buffer#purge_chunk catch ENOENT and ensure that @queue_size_metrics.sub(bytesize) runs?
stanhu commented 1 day ago

It seems this intermittent closed stream error was reported a while ago: https://github.com/fluent/fluentd/issues/2391

stanhu commented 1 day ago

@ashie I suspect we need https://github.com/fluent/fluentd/pull/4336 after all. I'm seeing what looks to be a race condition on a single worker instance.

ashie commented 1 day ago

It seems that your own plugin is related with this stack trace.

 2024-10-15 16:50:28 +0000 [error]: #0 /etc/fluent/plugin/out_cloud_pubsub.rb:62:in `write'

Since we can't read this code, I'm not sure the cause yet.

ashie commented 1 day ago

It seems this intermittent closed stream error was reported a while ago: https://github.com/fluent/fluentd/issues/2391

We already found out that #2391 is caused by rollback, but it doesn't appear in your error log. So it doesn't seem related with this.

stanhu commented 22 hours ago

@ashie out_cloud_pubsub.rb is this:

# Originally copied from https://github.com/yosssi/fluent-plugin-cloud-pubsub
# License: MIT
require 'google/cloud/pubsub'

module Fluent
  class CloudPubSubOutput < BufferedOutput
    MAX_REQ_SIZE = 10 * 1024 * 1024 # 10 MB
    MAX_MSGS_PER_REQ = 1000

    Plugin.register_output('cloud_pubsub', self)

    config_param :project,          :string,  :default => nil
    config_param :topic,            :string,  :default => nil
    config_param :key,              :string,  :default => nil
    config_param :max_req_size,     :integer, :default => MAX_REQ_SIZE
    config_param :max_msgs_per_req, :integer, :default => MAX_MSGS_PER_REQ

    unless method_defined?(:log)
      define_method("log") { $log }
    end

    unless method_defined?(:router)
      define_method("router") { Fluent::Engine }
    end

    def configure(conf)
      super

      raise Fluent::ConfigError, "'project' must be specified." unless @project
      raise Fluent::ConfigError, "'topic' must be specified." unless @topic
    end

    def multi_workers_ready?
      true
    end

    def start
      super

      pubsub = Google::Cloud::PubSub.new(project_id: @project, credentials: @key)
      @client = pubsub.topic @topic
    end

    def format(tag, time, record)
      [tag, time, record].to_msgpack
    end

    def publish(msgs)
      log.debug "publish #{msgs.length} messages"

      @client.publish do |batch|
        msgs.each do |m|
          batch.publish m
        end
      end
    end

    def write(chunk)
      msgs = []
      msgs_size = 0

      chunk.msgpack_each do |tag, time, record|
        size = Yajl.dump(record).bytesize
        if msgs.length > 0 && (msgs_size + size > @max_req_size || msgs.length + 1 > @max_msgs_per_req)
          publish(msgs)
          msgs = []
          msgs_size = 0
        end
        msgs << record.to_json
        msgs_size += size
      end

      if msgs.length > 0
        publish(msgs)
      end
    rescue
      log.error "unexpected error", :error=>$!.to_s
      log.error_backtrace
    end
  end
end

The error shows that it's happening during seek of file_chunk.rb, so I don't think this plugin is at fault here.