fluent / fluent-plugin-kafka

Kafka input and output plugin for Fluentd
Other
303 stars 176 forks source link

Enhancement : Add "discard_kafka_delivery_failed_regex" option #496

Closed kubotat closed 11 months ago

kubotat commented 11 months ago

When with Forwarder and Aggregator architecture, forwarders sometimes send invalid data which cause delivery failure at aggregators. out_rdkafka2 plugin hasdiscard_kafka_delivery_failed but it potentially discards not only unnecessary events but also required events, e.g. users expect Fluentd to keep events in buffer files during the outage of Kafka cluster (due to maintenance for instance) but all events are discarded when with discard_kafka_delivery_failed. This enhancement request proposes having discard_kafka_delivery_failed_regex option on out_rdkafka2 plugin to nullify invalid data by checking the error message with given regexp pattern.

Here is a sample use case: In the following configuration, dummy events are generated with tag test-topic0001 and Fluentd try to ship message to test-topic0001. If test-topic0001 does not exist in Kafka cluster, out_rdkafka2 emits a Local: Unknown topic (unknown_topic) warning message. With discard_kafka_delivery_failed_regex /Unknown topic/, out_rdkafka2 discards events which emits a Local: Unknown topic (unknown_topic) warning message.

<source>
  @type dummy
  sample {"Hello":"World"}
  tag test-topic0001
</source>

<match dummy>
    @type rdkafka2
    @id "rdkafka2"
    bokers kafka01.demo.local:9093, kafka02.demo.local:9093
    discard_kafka_delivery_failed_regex /Unknown topic/
    required_acks 1
    ack_timeout 20
    share_producer true
    ## Buffer settings
    <buffer tag>
      @type file
      path ./buffer/
      flush_interval 5s
    </buffer>
    <format>
      @type json
    </format>
</match>
kubotat commented 11 months ago

@ashie Thanks for reviewing. I've done two items you mentioned. I will keep working on a test code.

It seems some of the check processes were failed after updating the contents. I would appreciate it if you could give me an advice on how to pass the check process.

ashie commented 11 months ago

It seems some of the check processes were failed after updating the contents. I would appreciate it if you could give me an advice on how to pass the check process.

Don't worry, they aren't caused by this change. They are always unstable, need to rerun when failed. We should tackle on this as another issue.

I will keep working on a test code.

Please let me know if it's hard to add. I'll merge & release it without test for now.

kubotat commented 11 months ago

@ashie Thanks. I would appreciate it if could merge my request.

ashie commented 11 months ago

Thanks!

ashie commented 10 months ago

I've released v0.19.2