fluent / fluentd

Fluentd: Unified Logging Layer (project under CNCF)
https://www.fluentd.org
Apache License 2.0
12.82k stars 1.34k forks source link

Fluentd uses the Kafka output plugin, and after manually deleting the topic, it gets recreated. #4613

Open 15013752489 opened 3 weeks ago

15013752489 commented 3 weeks ago

Describe the bug

Fluentd uses the tail plugin to tail container logs and outputs the logs to a topic using the Kafka output plugin. After manually deleting the topic when the container stops, it is observed that the previously deleted topic is recreated along with the new topic when logs are pushed to the new topic. Below is my configuration:

To Reproduce

  1. Create a container to push logs to Kafka.
  2. After the container stops, delete the topic.
  3. Re-create a new container to push logs to a new Kafka topic.
  4. Use kafka-topics.sh to view the topics, and the previously deleted topic is recreated.

Expected behavior

I expected that the topic would not be recreated after being deleted.

Your Environment

fluentd (1.16.2)
fluent-plugin-kafka (0.19.3, 0.19.0)
Linux fluentd-4b7ml 4.15.0-197-generic #208-Ubuntu SMP Tue Nov 1 17:23:37 UTC 2022 x86_64 GNU/Linux

Your Configuration

<source>
      @type tail
      @id in_tail_container_logs
      @label @KUBERNETES
      path /var/log/containers/*.log
      pos_file /var/log/fluentd/fluentd-containers-tail.pos
      tag kubernetes.*
      read_from_head false
      refresh_interval 5s
      emit_unmatched_lines true
      <parse>
        @type multi_format
        #json格式解析
        <pattern>
          format json
          time_key time
          time_type string
          time_format '%Y-%m-%dT%H:%M:%S.%NZ'
          keep_time_key false
        </pattern>

        #正则解析:2023-07-27 16:04:38,004 [Thread-261] INFO  [com.polarizon.gendo3.plugin.faas.MutationFass] MutationFass.java:26 - 接收到变更消息
        <pattern>
          format regexp
          expression '/^(?<time>[^ ]+ [^ ]+) \[(?<thread>[^\]]+)\] (?<level>[^ ]+)  \[(?<class>[^\]]+)\] (?<message>.*)$/'
          time_format '%Y-%m-%dT%H:%M:%S.%NZ'
          keep_time_key false
        </pattern>

        #不做处理
        <pattern>
          format none
          message_key log
          time_key time
          time_type string
          time_format '%Y-%m-%dT%H:%M:%S.%NZ'
          keep_time_key false
        </pattern>
      </parse>
    </source>
<label @ZLIC-FILE_KAFKA_OUTPUT>
      <filter **>
        @type record_transformer
        enable_ruby true
        <record>
          output_log_path ${record.dig("kubernetes", "annotations","outputLogPath")}
          kafka_topic_key ${record.dig("kubernetes", "annotations","outputLogTopic")}
        </record>
      </filter>
      <match **>
        @type copy
        #存储到本地
        <store>
          @type file
          append true
          #compress gzip
          #这里需要注意为了避免文件描述符泄漏,path使用的变量需要为buffer使用的变量的子集!!!
          path ${$.output_log_path}/%Y-%m-%d.%H%M

          #日志格式化,只输出日志文本
          <format>
            @type single_value
            message_key log
            add_newline false
          </format>

          # buffer settings 
          <buffer time,$.output_log_path>
            @type memory
            flush_thread_count 5
            flush_mode interval
            flush_interval 1s
          </buffer>
        </store>

        #发送到kafka
        <store>
          @type kafka2
          brokers "#{ENV['ZLIC_BOOTSTRAP_SERVER']}"
          use_event_time true

          # topic settings
          topic_key $.kafka_topic_key
          default_topic fluentd-topic

          # producer settings
          required_acks -1

          #日志格式化,只输出日志文本
          <format>
            @type single_value
            message_key log
            add_newline false
          </format>

          # buffer settings
          <buffer $.kafka_topic_key>
            @type memory
            flush_thread_count 5
            flush_mode interval
            flush_interval 1s
          </buffer>
        </store>
      </match>
    </label>

Your Error Log

no

Additional context

No response

Athishpranav2003 commented 2 weeks ago

@15013752489 could be also that buffered logs are sent on restart. Did you check if the recreated topic is empty or has events?