fluent / fluent-plugin-s3

Amazon S3 input and output plugin for Fluentd
https://docs.fluentd.org/output/s3
314 stars 218 forks source link

When match_regexp not matched, message is discarded #418

Open jasonreevessimmons opened 1 year ago

jasonreevessimmons commented 1 year ago

Describe the bug

Although the aws_sdk_sqs/queue_poller module does indeed delete only at the end of the code block passed to it while polling (assuming :skip_delete is false), the use of next unless @match_regexp.match?(key) short circuits the block and the delete action occurs.

My set up involves multiple Fluentd nodes pointing to one SQS queue. Fluentd stores events in S3 using the hostname as part of the path, and a regex to match the hostname is used to pull events back into Fluentd, because I want the node that originally processed the event to process it the second time for sending to OpenSearch.

To Reproduce

Using multiple Fluentd nodes, send events to S3 using the unique hostname as part of the path.

Ingest the events using the S3 input plugin and an appropriately configured SQS queue, using match_regexp to match the hostname part of the path.

If an event processed by host A is picked up by host B, for example, the event won't be processed but will be deleted and will never make its way to your ultimate destination.

Expected behavior

The expected behavior would be that the event would not be deleted, but left on the queue for the appropriate host to process.

Your Environment

- Fluentd version: 1.5.13
- fluent-plugin-s3 version: 1.7.2
- aws-sdk-s3 version: 1.119.1
- aws-sdk-sqs version: 1.53.0
- Operating system: 
NAME="Ubuntu"
VERSION="20.04.6 LTS (Focal Fossa)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 20.04.6 LTS"
VERSION_ID="20.04"
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
VERSION_CODENAME=focal
UBUNTU_CODENAME=focal
- Kernel version:  5.15.0-1031-aws

Your Configuration

## ingest from UDP 25001 on localhost
<source>
  @type udp
  port 25001
  bind localhost
  tag port_25001
  <parse>
    @type json
  </parse>
</source>

## ingest from S3 via SQS
<source>
  @type s3
  tag from_s3
  s3_bucket BUCKET
  s3_region REGION
  add_object_metadata true
  store_as json
  match_regexp /logs/HOSTNAME/.*
  <sqs>
    queue_name QUEUE_NAME
  </sqs>
</source>

## send traffic from port 25001 to S3, storing it by using HOSTNAME as part of the key
<match port_25001>
  @type s3
  s3_bucket BUCKET
  s3_region REGION
  store_as json
  path /logs/HOSTNAME/${tag}/%Y-%m-%d/%H/%M%S
  s3_object_key_format %{path}-%{uuid_flush}.%{file_extension}
  auto_create_bucket false
  check_object false
  check_bucket false
  slow_flush_log_threshold 120s
  utc true

  <buffer tag,time>
    @type file
    timekey 5m 
    timekey_wait 1m 
    timekey_use_utc true
    path /data/var/s3_buffer
    flush_thread_count 16
    chunk_limit_size 32M 
    queue_limit_length 16
    retry_max_interval 30
    retry_forever true
  </buffer>
</match>

## send items coming from S3 to stdout
<match from_s3>
  @type stdout
  format pretty_json
</match>

Your Error Log

There is no error log.  The events just don't show up in the output if another host grabs the message in the queue.

Additional context

I have a patch that works to prevent this issue. You all may prefer a more nuanced approach, but this works for me:

--- lib/fluent/plugin/in_s3_orig.rb     2023-03-29 18:46:58.772216442 +0000
+++ lib/fluent/plugin/in_s3.rb  2023-03-29 18:19:44.867117653 +0000
@@ -211,7 +211,7 @@
             if @match_regexp
               raw_key = get_raw_key(body)
               key = CGI.unescape(raw_key)
-              next unless @match_regexp.match?(key)
+              throw :skip_delete unless @match_regexp.match?(key) 
             end
             process(body)
           rescue => e
jasonreevessimmons commented 1 year ago

A caveat about the strategy I used above - if you're facing high log volumes, you may realize that lots of events get "stuck" in SQS. As the unprocessed events pile up in SQS, individual Fluentd node performance will drop, likely because as the SQS queue is polled a BUNCH of events come back and it's unable to process them efficiently.

You probably won't see this adverse reaction in a single node architecture. My architecture has three Fluentd nodes polling a single SQS queue. Without my patch, I was losing data, but with my patch, I lost data anyway due to needing to purge the SQS queue periodically.

This patch may be useful for some situations, and should probably be implemented as a configuration option like toss_event_if_no_regexp_match or something like that.