fluent / fluent-plugin-s3

Amazon S3 input and output plugin for Fluentd
https://docs.fluentd.org/output/s3
314 stars 218 forks source link

process of determining 'index' is not thread safe #326

Closed tatsu-yam closed 3 years ago

tatsu-yam commented 4 years ago

In fluent-plugin-s3, if the value of flush_thread_count is greater than 1 , the data on S3 will be missing. I think that it is because Fluent::Plugin::S3Output#write method is not thread safe.

td-agent.conf

<source>
  @type tail
  format ltsv
  keep_time_key true
  read_from_head true
  path /tmp/td-agent-failure-sample/tmp/1500000.log
  pos_file /var/log/td-agent/1500000.log-ng.pos
  tag app.test_log
  time_format %d/%b/%Y:%H:%M:%S %z
</source>

<match app.**>
  @type s3
  @id out_s3

  s3_bucket "#{ENV['S3_BUCKET']}"
  s3_region "#{ENV['S3_REGION']}"

  path "20200512_1647/ng"
  s3_object_key_format "%{path}/%Y%m%d.%{index}.dat.%{file_extension}"

  check_apikey_on_start false
  check_bucket false

  <buffer tag,time>
    @type file
    path /var/log/td-agent/buffer/ng
    timekey 1d
    timekey_wait 10m
    chunk_limit_size 256m
    flush_thread_count 4

    flush_at_shutdown true
    flush_mode interval
    flush_interval 5 # test setting
  </buffer>
  <format>
    @type json
  </format>
</match>

Test data (/tmp/td-agent-failure-sample/tmp/1500000.log) was transferred to s3 with td-agent. As a result, the number of records in the original data and the data on s3 do not match. Of course, there are no errors in td-agent.log.

# source data
$ cat /tmp/td-agent-failure-sample/tmp/1500000.log |wc -l
1500000

# s3
$ aws s3 sync s3://$S3_BUCKET/20200512_1647/ ans/20200512_1647/
$ zcat ans/20200512_1647/ng/* |wc -l
1156632

So, I made the following modifications to fluentd and fluent-plugin-s3 and transferred it.

# fluentd code diff
$ git diff
diff --git a/lib/fluent/log.rb b/lib/fluent/log.rb
index 3424aa9..76c09c1 100644
--- a/lib/fluent/log.rb
+++ b/lib/fluent/log.rb
@@ -319,6 +319,7 @@ module Fluent
       return if skipped_type?(type)
       args << block.call if block
       time, msg = event(:info, args)
+      msg = "[#{Thread.current.object_id}] #{msg}"
       puts [@color_info, @formatter.call(type, time, LEVEL_INFO, msg), @color_reset].join
     rescue
     end

# fluent-plugin-s3 code diff
$ git diff
diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb
index 8b173c7..9694025 100644
--- a/lib/fluent/plugin/out_s3.rb
+++ b/lib/fluent/plugin/out_s3.rb
@@ -351,6 +351,7 @@ module Fluent::Plugin
             put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)})
           end
         end
+        log.info "put object tmp:#{tmp} to s3path:#{s3path}"
         @bucket.object(s3path).put(put_options)

         @values_for_s3_object_chunk.delete(chunk.unique_id)

The resulting td-agent.log.

2020-05-12 16:47:06 +0900 [info]: [70057850679140] gem 'fluentd' version '1.4.2'
2020-05-12 16:47:06 +0900 [info]: [70057850679140] adding match pattern="app.**" type="s3"
2020-05-12 16:47:06 +0900 [info]: [70057850679140] adding source type="tail"
2020-05-12 16:47:06 +0900 [info]: #0 [70057850679140] starting fluentd worker pid=8749 ppid=8744 worker=0
2020-05-12 16:47:06 +0900 [info]: #0 [70057850679140] following tail of /tmp/td-agent-failure-sample/tmp/1500000.log
2020-05-12 16:47:17 +0900 [info]: #0 [70057784455520] [out_s3] put object tmp:#<File:0x00007f6f327c9a68> to s3path:20200512_1647/ng/20200512.0.dat.gz
2020-05-12 16:47:20 +0900 [info]: #0 [70057784455080] [out_s3] put object tmp:#<File:0x00007f6f32786df8> to s3path:20200512_1647/ng/20200512.0.dat.gz
2020-05-12 16:47:25 +0900 [info]: #0 [70057784454700] [out_s3] put object tmp:#<File:0x00007f6f3232ec88> to s3path:20200512_1647/ng/20200512.1.dat.gz
2020-05-12 16:47:30 +0900 [info]: #0 [70057784455940] [out_s3] put object tmp:#<File:0x00007f6f324464b8> to s3path:20200512_1647/ng/20200512.2.dat.gz
2020-05-12 16:47:34 +0900 [info]: #0 [70057850679140] fluentd worker is now running worker=0
2020-05-12 16:47:35 +0900 [info]: #0 [70057784455520] [out_s3] put object tmp:#<File:0x00007f6f324c0f88> to s3path:20200512_1647/ng/20200512.3.dat.gz
2020-05-12 16:47:37 +0900 [info]: #0 [70057784455940] [out_s3] put object tmp:#<File:0x00007f6f32478b48> to s3path:20200512_1647/ng/20200512.4.dat.gz

Two threads uploaded to filename 20200512.0.dat.gz. I think the process of determining %{index} is not thread safe.

I think using uuid_flush will probably work around this problem. However, since the default value of s3_object_key_format is "%{path}%{time_slice}_%{index}. %{file_extension}", so I think this will affect a lot of users.

I think this issue is relevant. https://github.com/fluent/fluent-plugin-s3/issues/315

repeatedly commented 4 years ago

Yes. This problem is similar to process wide conflict. This problem happens when S3 processing is slower than next chunk flush. To avoid this problem, show warning message like "Buffer configuration uses multiple flush threads. Recommend to use chunk_id or uuid_flush in object path to avoid object conflict". How about this?

withgod commented 4 years ago

I encountered this bug and consulted with tatsu-yama. I think it would be better to change the default to %{uuid_flush} or %{chunk_flush} and describe the risk of using %{index} in the s3_object_key_format in the documentation.

repeatedly commented 4 years ago

Changing default value affects existing users. So need 2 steps for it.

  1. Add warning for multiple threads and release it in v1
  2. Release v2 with default change
withgod commented 4 years ago

I certainly forgot about the impact on existing users ;)

repeatedly commented 4 years ago

Patch for https://github.com/fluent/fluent-plugin-s3/pull/327

github-actions[bot] commented 3 years ago

This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days

github-actions[bot] commented 3 years ago

This issue was automatically closed because of stale in 30 days