fluent / fluent-plugin-kafka

Kafka input and output plugin for Fluentd
Other
303 stars 176 forks source link

Implement `use_default_for_unknown_topic` for rdkafka2 output plugin #489

Closed raytung closed 1 year ago

raytung commented 1 year ago

Is your feature request related to a problem? Please describe.

Currently the rdkafka2 output plugin has a default_topic configuration but that's only for when the message itself does not contain a topic field. If the default_topic parameter is not set, and that the plugin tries to write to a topic that does not exist in Kafka brokers, the plugin will produce a log Local: Unknown topic (unknown_topic) and then stops processing.

Describe the solution you'd like

Implement use_default_for_unknown_topic for rdkafka2 output plugin, similar to the parameter for the kafka2 output plugin, where it will write to the default topic if the desired topic is not found in Kafka brokers.

If the default topic does not exist in Kafka brokers, we should then log a warning message. (Possibly allow configuring whether to throw exception or block or drop the message).

A naive implementation could be identical to the solution in kafka2 output:

  1. We first attempt to write to the desired topic
  2. Rescue the producer produce attempt, and if the rescued error is unknown topic, we set override the topic name of the message and retry. (This is done in this code https://github.com/fluent/fluent-plugin-kafka/blob/c21236f20e23226c166a93e38d2c24401fa59d90/lib/fluent/plugin/out_rdkafka.rb#L309-L320, and check whether the error code equals to :unknown_topic

A potential optimization could be: First check whether the topic exist in the topic metadata before attempting produce. I'm not familiar enough with librdkafka to know whether it does this already.

Describe alternatives you've considered

N/A

Additional context

No response

ashie commented 1 year ago

We'll keep this opened untill https://github.com/fluent/fluent-plugin-kafka/pull/490#discussion_r1193900681 is resolved.

raytung commented 1 year ago

Documenting for visibility, #490 implemented the logic to publish to the default topic if topic is not found in Kafka brokers. However, we have not considered publishing to the default topic if we receive a UNKNOWN_PARTITION error (the out_kafka2 plugin does this).

I can think of a couple of cases where we would receive an UNKNOWN_PARTITION error (potentially more):

  1. Kafka broker had corrupted metadata (or topic data has not propagated to all brokers yet)
  2. The topic was removed in Kafka and then a new topic with the same name is re-created but with less number of partitions
  3. Producer does not have proper read access

In all cases, I think it makes sense to publish to the default topic, but perhaps we should warn log, and maybe require a separate config parameter for this. What do you think @ashie? I'm also happy if we retry and then drop messages in these 3 cases by default.

ashie commented 1 year ago

Thanks for clarification for this.

In all cases, I think it makes sense to publish to the default topic, but perhaps we should warn log, and maybe require a separate config parameter for this.

I completely agree with you.