fluent / fluent-plugin-kafka

Kafka input and output plugin for Fluentd
Other
303 stars 176 forks source link

How to set topic_key with record's field in kafka2 output #492

Closed YuWan1117 closed 1 year ago

YuWan1117 commented 1 year ago

My question has been solved.

Here is the solution in my case.

  1. I add a new key named 'k8s_topic' in record_transformer firstly
      <filter servicelog.**>
         @type record_transformer
         @id filter_record_transformer
         enable_ruby
         <record>
            message ${record.dig("log_message")}
            node_name ${record.dig("kubernetes", "host")}
            pod_name ${record.dig("kubernetes", "pod_name")}
            # add k8s_topic key
            k8s_topic ${record.dig("kubernetes", "labels", "kafkaTopic")} 
         </record>
         remove_keys log_message,kubernetes
      </filter>
  2. I set topic_key with the $.k8s_topic format in kafka2 output, and add the buffer block like the follow config.
        topic_key $.k8s_topic
        <buffer $.k8s_topic>
           @type file
           path /var/log/td-agent/buffer/td
           flush_interval 10s
        </buffer>

I alse have annother solution in my case.

I add a new key named topic in record_transformer. And don‘t set topic、topic_key、default_topic in kafka2 output.

Reference: https://groups.google.com/g/fluentd/c/rtSjA1oP3KI