fluent / fluent-plugin-s3

Amazon S3 input and output plugin for Fluentd
https://docs.fluentd.org/output/s3
314 stars 215 forks source link

S3 output filling up /tmp #416

Open waqarsky opened 1 year ago

waqarsky commented 1 year ago

Describe the bug

I have these files on /tmp filling up disk space and they are not removed by fluentd

30M s3-20230115-3429444-1n0gm68
34M s3-20230118-4051437-8mi5ts
36M s3-20230105-1633445-ymzdql
44M s3-20230118-4082680-1v29q88
56M s3-20230111-2775877-5b3vmz
63M s3-20230109-2467022-n12d0g
65M s3-20230121-437995-1sq1mt9
75M s3-20230106-1845823-cd9qij
76M s3-20230111-2738261-1a3zss4
77M s3-20230108-2205141-scyh2z
83M s3-20230116-3580308-le9f4y
98M s3-20230108-2273951-1fwklwx
132M    s3-20230119-4177644-1b82v95
151M    s3-20230109-2341735-1p706j2
198M    s3-20230120-229601-t0fspm
232M    s3-20230113-3179791-qhwem0
244M    s3-20230105-1555034-fj7pmg
244M    s3-20230110-2699071-y6dgc7
244M    s3-20230112-3003537-m4vg83
244M    s3-20230117-3937170-1s4izqw

To Reproduce

Set up the plugin with the config below. These files are very infrequent, not all of the ones created remain. So the plugin leaves some in /tmp and deletes most as it should

Expected behavior

tmp files should be removed after upload or there should be an error stating why the files are still there

Your Environment

- Fluentd version: fluentd (1.15.2)
- fluent-plugin-s3 version: 1.7.1
- Operating system
- Kernel version: 
5.15.0-1022-aws #26~20.04.1-Ubuntu

Your Configuration

  </store>
  <store ignore_error>
    @type s3
    @log_level info
    s3_bucket "BUCKET"
    s3_region "REGION"
    acl bucket-owner-full-control
    store_as "json"
    <format>
      @type json
    </format>
    use_server_side_encryption "ENCRYPTION"
    path %Y/%m/%d/
    time_slice_format %Y%m%d-%H%M%S
    s3_object_key_format %{path}%{uuid_flush}-%{hex_random}-%{time_slice}-%{hms_slice}.%{file_extension}
    check_object false
    slow_flush_log_threshold 100.0
    <buffer REDACTED>
      retry_wait 20s
      retry_exponential_backoff_base 2
      retry_type exponential_backoff
      retry_max_interval 300s
      disable_chunk_backup true
      @type file
      path /s3/REDACTED
      # timekey 30 # 30 seconds partition
      # timekey_wait 1m
      timekey_use_utc true # use utc
      flush_mode interval
      flush_interval 30s
      flush_thread_interval 5.0
      flush_thread_count 8
      chunk_limit_size 256M
      total_limit_size   137G
    </buffer>
  </store>

Your Error Log

There are no errors related to this

Additional context

It looks like this is the code creating the files. There is no way to specify a custom "temp" directory either so / keeps getting filled up if it is small

tmp = Tempfile.new("s3-")
      tmp.binmode
      begin
        @compressor.compress(chunk, tmp)
        tmp.rewind
        log.debug "out_s3: write chunk #{dump_unique_id_hex(chunk.unique_id)} with metadata #{chunk.metadata} to s3://#{@s3_bucket}/#{s3path}"

        put_options = {
          body: tmp,
          content_type: @compressor.content_type,
          storage_class: @storage_class,
        }
        put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption
        put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id
        put_options[:sse_customer_algorithm] = @sse_customer_algorithm if @sse_customer_algorithm
        put_options[:sse_customer_key] = @sse_customer_key if @sse_customer_key
        put_options[:sse_customer_key_md5] = @sse_customer_key_md5 if @sse_customer_key_md5
        put_options[:acl] = @acl if @acl
        put_options[:grant_full_control] = @grant_full_control if @grant_full_control
        put_options[:grant_read] = @grant_read if @grant_read
        put_options[:grant_read_acp] = @grant_read_acp if @grant_read_acp
        put_options[:grant_write_acp] = @grant_write_acp if @grant_write_acp
        put_options[:tagging] = @tagging if @tagging

        if @s3_metadata
          put_options[:metadata] = {}
          @s3_metadata.each do |k, v|
            put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)})
          end
        end
        @bucket.object(s3path).put(put_options)

        @values_for_s3_object_chunk.delete(chunk.unique_id)

        if @warn_for_delay
          if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay
            log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}"
          end
        end
      ensure
        tmp.close(true) rescue nil
      end
    end
waqarsky commented 1 year ago

Anyone here? Is this plugin still supported?

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days