logstash-plugins / logstash-input-kafka

Kafka input for Logstash
Apache License 2.0
139 stars 122 forks source link

Feature Request: The ability to refresh topic list in case of 'topics_pattern' option usage #192

Closed nfsec closed 5 years ago

nfsec commented 7 years ago

Hi,

I would like to suggest an logstash-input-kafka plugin option associated with '_topicspattern'. Let's say that someone generates new topics on-the-fly with logstash-output-kafka plugin (in example: depending on the some field value). To consume messages from that topics he can use '_topicspattern'. But to see new topics that are created (refresh the list matching the pattern) logstash must be reloaded/restarted by some kind of wrapper or supervisor which will monitor the appearance of new topics. I think something like '_topics_patternrefresh' would be nice supplement to automatize this flow.

yfoelling commented 6 years ago

We automaticly generate and delete topics on regular basis. We read them with the 'topics_pattern' option. If we create a new one it will not be consumed. Even worse; if we delete an topic that is known to logstash it will throw errors every few seconds until it is restardet. 2018-03-26T15:24:18.026484481Z [ERROR] 2018-03-26 15:24:18.026 [Ruby-0-Thread-24: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.4/lib/logstash/inputs/kafka.rb:241] ConsumerCoordinator - [Consumer clientId=<CLIENT_ID>-4, groupId=<GROUP_ID>] Offset commit failed on partition <TOPIC>-39 at offset 5: This server does not host this topic-partition.

Please fix this beahavior.

kroov commented 6 years ago

There is such option called "metadata_max_age_ms". You could set it to 60000 and it will be updating topics list every minute.

ysn2233 commented 6 years ago

any solution of this issue?

kroov commented 6 years ago

Here is working solution

input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics_pattern => "topic-.*"
    group_id => "groupid"
    auto_offset_reset => "earliest"
    metadata_max_age_ms => 60000
    codec => "json"
  }
}
yfoelling commented 6 years ago

Using that option only helps if you want to add topics on the fly, but if you delete topics that logstash listens to it will still crash.

nfsec commented 6 years ago

Lately, this option did not work for logstash-output-kafka: https://github.com/logstash-plugins/logstash-output-kafka/issues/197

ysn2233 commented 6 years ago

we got the delete crash problem as well unless we use "topics" instead of "topics_pattern". If it can not be solved, can I contribute to add a config like "topics-ruby" which allow user to use ruby code to define topics array?