logstash-plugins / logstash-filter-aggregate

The aim of this filter is to aggregate informations available among several events (typically log lines) belonging to a same task, and finally push aggregated information into final task event.
Apache License 2.0
43 stars 40 forks source link

Logstash 5.6.1 incremental refresh is failing, but clean refresh works fine! #72

Closed vishnyk closed 7 years ago

vishnyk commented 7 years ago

Hi all;

I am facing this logstash5.6.1 issue in Prod environment, but it works fine in QA.

1- Incremetal refresh (oracle drivers) using parameter " :sql_last_value" is failing. It loads only one record as per debug log.

[2017-09-25T10:01:15,081][INFO ][logstash.inputs.jdbc ] (5.768000s) SELECT count() "COUNT" FROM (SELECT FROM ESWATERV5 WHERE device_consumption_epoc > 1506309300) "T1" FETCH NEXT 1 ROWS ONLY


2-  Iremental refresh using same configuration file works fine in QA. When I have started testing this in Prod, its  failing. Setup is same and Java version is also same.

3- Complete refresh using logstash configuration parameter "clean_run => true " works fine in production and QA. 

4- One more info, actually I have manually edited "logstash-ESWATER.lastrun"file and put the value for incremetal refresh. It works fine in QA but its failing in PROD. 
5- configuration , bebug log and command mentioned below.. Please help 

Logstash configuration file
===========================
input {
    jdbc {
        jdbc_validate_connection => true
        jdbc_connection_string => "jdbc:oracle:thin:@server40:1521/RptPWAMI"
        jdbc_user => "reporting"
        jdbc_password => "trapRABaWa756uvE"
        jdbc_driver_library => "/home/oracle/logstash/logstash-5.4.0/ojdbc7.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        statement => "SELECT * FROM ESWATERV5 WHERE device_consumption_epoc > :sql_last_value"
        use_column_value => true
        tracking_column => "device_consumption_epoc"
        tracking_column_type => "numeric"
        record_last_run => true
        last_run_metadata_path => "/home/oracle/logstash/logstash-5.4.0/config/water/logstash-ESWATER.lastrun"
#        schedule => "0 6 * * *"
#         clean_run => true
 }
}
filter {
if [mlatitude] and [mlongitude] {
mutate {
add_field => ["[geoip][meterlocation]","%{mlongitude}"]
add_field => ["[geoip][meterlocation]","%{mlatitude}"]
}
}
mutate {
 convert => [ "[geoip][meterlocation]", "float" ]
}
if [glatitude] and [glongitude] {
mutate {
add_field => ["[geoip][gatewaylocation]","%{glongitude}"]
add_field => ["[geoip][gatewaylocation]","%{glatitude}"]
}
}
mutate {
 convert => [ "[geoip][gatewaylocation]", "float" ]
}
}
output {
    #stdout { codec => rubydebug }
     elasticsearch {
      index => "water_consumption"
      document_type => "meter_reads"
       }
}

Logstash debug Command
=======================
./logstash -f /home/oracle/logstash/logstash-5.4.0/config/water/logstashwater.conf --debug

Logstash debug log
=======================
..
[2017-09-25T10:01:08,586][DEBUG][logstash.outputs.elasticsearch] Found existing Elasticsearch template. Skipping template management {:name=>"logstash"}
[2017-09-25T10:01:08,587][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//127.0.0.1"]}
[2017-09-25T10:01:08,589][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>10, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1250}
[2017-09-25T10:01:08,668][INFO ][logstash.pipeline        ] Pipeline main started
[2017-09-25T10:01:08,675][DEBUG][logstash.agent           ] Starting puma
[2017-09-25T10:01:08,676][DEBUG][logstash.agent           ] Trying to start WebServer {:port=>9600}
[2017-09-25T10:01:08,678][DEBUG][logstash.api.service     ] [api-service] start
[2017-09-25T10:01:08,696][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-09-25T10:01:13,670][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2017-09-25T10:01:15,081][INFO ][logstash.inputs.jdbc     ] (5.768000s) SELECT count(*) "COUNT" FROM (SELECT * FROM ESWATERV5 WHERE device_consumption_epoc > 1506309300) "T1" FETCH NEXT 1 ROWS ONLY
[2017-09-25T10:01:15,083][DEBUG][logstash.inputs.jdbc     ] Executing JDBC query {:statement=>"SELECT * FROM ESWATERV5 WHERE device_consumption_epoc > :sql_last_value", :parameters=>{:sql_last_value=>1506309300}, :count=>30}
[2017-09-25T10:01:18,670][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2017-09-25T10:01:21,191][INFO ][logstash.inputs.jdbc     ] (6.107000s) SELECT * FROM ESWATERV5 WHERE device_consumption_epoc > 1506309300
[2017-09-25T10:01:21,281][DEBUG][logstash.inputs.jdbc     ] closing {:plugin=>"LogStash::Inputs::Jdbc"}
[2017-09-25T10:01:21,281][DEBUG][logstash.pipeline        ] Input plugins stopped! Will shutdown filter/output workers.
[2017-09-25T10:01:21,282][DEBUG][logstash.pipeline        ] filter received {"event"=>{"devicefirmware"=>nil, "buildingname"=>"Mazaya-4", "devicedailyconsumptionobiscode"=>"8-0:2.8.0*255", "devicescount"=>#..
..
..
"gatewaytimestamplastonline"=>2017-09-24T23:43:28.000Z, "devicetypename"=>"Diehl/Hydrometer Hydrus DN 15-20", "devicegroupname"=>"643-WADI AL SAFA 2", "billingcycleid"=>"D31"}}
[2017-09-25T10:01:21,447][DEBUG][logstash.pipeline        ] Shutdown waiting for worker thread #<Thread:0x49fcc629>
..
..
<Thread:0x5533263d>
[2017-09-25T10:01:21,487][DEBUG][logstash.pipeline        ] Shutdown waiting for worker thread #logstash <Thread:0x53715b8e>
[2017-09-25T10:01:21,487][DEBUG][logstash.pipeline        ] Shutdown waiting for worker thread #<Thread:0x46bb120e>
[2017-09-25T10:01:21,488][DEBUG][logstash.filters.mutate  ] closing {:plugin=>"LogStash::Filters::Mutate"}
[2017-09-25T10:01:21,489][DEBUG][logstash.filters.mutate  ] closing {:plugin=>"LogStash::Filters::Mutate"}
[2017-09-25T10:01:21,489][DEBUG][logstash.outputs.elasticsearch] closing {:plugin=>"LogStash::Outputs::ElasticSearch"}
[2017-09-25T10:01:21,489][DEBUG][logstash.outputs.elasticsearch] Stopping sniffer
[2017-09-25T10:01:21,489][DEBUG][logstash.outputs.elasticsearch] Stopping resurrectionist
[2017-09-25T10:01:21,552][DEBUG][logstash.outputs.elasticsearch] Waiting for in use manticore connections
[2017-09-25T10:01:21,552][DEBUG][logstash.outputs.elasticsearch] Closing adapter #<LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter:0x4fd5d428>
[2017-09-25T10:01:21,553][DEBUG][logstash.pipeline        ] Pipeline main has been shutdown
[2017-09-25T10:01:21,682][DEBUG][logstash.instrument.periodicpoller.os] PeriodicPoller: Stopping
[2017-09-25T10:01:21,683][DEBUG][logstash.instrument.periodicpoller.jvm] PeriodicPoller: Stopping
[2017-09-25T10:01:21,684][DEBUG][logstash.instrument.periodicpoller.persistentqueue] PeriodicPoller: Stopping
[2017-09-25T10:01:21,684][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] PeriodicPoller: Stopping
[2017-09-25T10:01:21,693][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}
[2017-09-25T10:01:21,694][DEBUG][logstash.pipeline        ] Closing inputs
[2017-09-25T10:01:21,695][DEBUG][logstash.inputs.jdbc     ] stopping {:plugin=>"LogStash::Inputs::Jdbc"}
[2017-09-25T10:01:21,700][DEBUG][logstash.pipeline        ] Closed inputs

Without debug 
==================
oracle@bin]$ ./logstash -f /home/oracle/logstash/logstash-5.4.0/config/water/logstashwater.conf
..
[2017-09-25T10:26:46,034][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//127.0.0.1"]}
[2017-09-25T10:26:46,036][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>10, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1250}
[2017-09-25T10:26:46,113][INFO ][logstash.pipeline ] Pipeline main started
[2017-09-25T10:26:46,141][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-09-25T10:26:52,951][INFO ][logstash.inputs.jdbc ] (6.124000s) SELECT * FROM ESWATERV5 WHERE device_consumption_epoc > 1506310200
[2017-09-25T10:26:53,131][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
vishnyk commented 7 years ago

How to fix this issue?

fbaligand commented 7 years ago

mmhhh... looking to your Logstash configuration, it seems to be a jdbc input issue, given that you have no aggregate filter. So I invite you to open an issue here : https://github.com/logstash-plugins/logstash-input-jdbc/issues