logstash-plugins / logstash-input-kafka

Kafka input for Logstash
Apache License 2.0
139 stars 122 forks source link

kafka meta data timestamp issue. #331

Closed Tsukiand closed 4 years ago

Tsukiand commented 4 years ago

First of all, I will introduce our data pipeline:

  1. We deployed logstash in severs to ingest data from files and send all the data to kafka.
  2. We use logstash-input-kafka to get data from kafka. Then we write the data to Elasticsearch

Recently, we want to record the time interval data spend in kafka cluster. So we will need a start_time and a end_time. As we have enable decorate_events, we will get the start_time of data in kafka meta_data. Then we will need a end_time. So i have add a field to track the end_time as below:

file: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.2.1/lib/logstash/inputs/kafka.rb line: 264 I have add a new field named "kafka_delay_ms" to track the time_interval.

258               if @decorate_events
259                 event.set("[@metadata][kafka][topic]", record.topic)
260                 event.set("[@metadata][kafka][consumer_group]", @group_id)
261                 event.set("[@metadata][kafka][partition]", record.partition)
262                 event.set("[@metadata][kafka][offset]", record.offset)
263                 event.set("[@metadata][kafka][key]", record.key)
264                 event.set("[@metadata][kafka][timestamp]", record.timestamp)
265                 event.set("[kafka_delay_ms]", LogStash::Timestamp.now.time.to_i*1000 - record.timestamp)

After all these works done, i got the data with kafka_delay_ms. But i got minus value of field kafka_delay_ms. I think that is not reasonable. But i donot know how this happened.

Here is the sample data:

Screen Shot 2019-10-21 at 15 49 00

input { kafka { bootstrap_servers => "***:9092" client_id => "***" group_id => "***" topics_pattern => ["***"] consumer_threads => 1 decorate_events => true codec => "json" request_timeout_ms=>"110000" session_timeout_ms=> "100000" max_poll_records => "500" heartbeat_interval_ms => "20000" } }

filter { if ( [eventtype] == 'metrics' ) { mutate { add_field => { "[@metadata][dailyindex]" => "metrics-**-%{component}-%{+YYYY.MM.dd}" } } } else { mutate { add_field => { "[@metadata][dailyindex]" => "logs-**-%{component}-%{+YYYY.MM.dd}" } } } if [@metadata][ELK_UUID] == "1" { fingerprint { target => "[@metadata][fingerprint]" base64encode => true method => "UUID" } } else { fingerprint { source => ["message","metrics","beat.name","host","logsource"] concatenate_sources => true target => "[@metadata][fingerprint]" base64encode => true method => "MD5" key => "***" } } mutate { lowercase => [ "[@metadata][dailyindex]" ] add_field => { "[kafka_meta]" => "%{[@metadata][kafka]}"} } }

output{ else { elasticsearch { hosts => ["https://***:9200"] cacert => "/etc/logstash/***.crt" index=> "%{[@metadata][dailyindex]}" user => "***" password => "${***}" document_id => "Ls%{[@metadata][fingerprint]}" timeout => 180 } } }

yaauie commented 4 years ago

The Timestamp#to_i method is lossy, and rounds down to the nearest whole second:

        @JRubyMethod(name = "to_i")
        public IRubyObject ruby_to_i(ThreadContext context)
        {
            return RubyFixnum.newFixnum(context.runtime, this.timestamp.getTime().getMillis() / 1000);
        }

-- logstash-core/src/main/java/org/logstash/ext/JrubyTimestampExtLibrary.java@v6.7.2:84-88

If you use Timestamp#to_f, you will not lose the sub-second precision:

-             event.set("[kafka_delay_ms]", LogStash::Timestamp.now.time.to_i*1000 - record.timestamp)
+             event.set("[kafka_delay_ms]", LogStash::Timestamp.now.time.to_f*1000 - record.timestamp)
Tsukiand commented 4 years ago

The Timestamp#to_i method is lossy, and rounds down to the nearest whole second:

        @JRubyMethod(name = "to_i")
        public IRubyObject ruby_to_i(ThreadContext context)
        {
            return RubyFixnum.newFixnum(context.runtime, this.timestamp.getTime().getMillis() / 1000);
        }

-- logstash-core/src/main/java/org/logstash/ext/JrubyTimestampExtLibrary.java@v6.7.2:84-88

If you use Timestamp#to_f, you will not lose the sub-second precision:

-             event.set("[kafka_delay_ms]", LogStash::Timestamp.now.time.to_i*1000 - record.timestamp)
+             event.set("[kafka_delay_ms]", LogStash::Timestamp.now.time.to_f*1000 - record.timestamp)

Thank you for your help. This solution works for me.