logstash-plugins / logstash-input-jdbc

Logstash Plugin for JDBC Inputs
Apache License 2.0
449 stars 187 forks source link

Problem with data enrichment using jdbc_default_timezone and fields of type "timestamp" #302

Open tiagopintof opened 6 years ago

tiagopintof commented 6 years ago

Hello,

I'm running into some problems when I'm trying to enrich data using input jdbc and filter jdbc_streaming.

My System: Logstash - 6.4.0 logstash-input-jdbc - 4.3.11 logstash-filter-jdbc-streaming - 1.0.4 Postgresql - 9.6.2 Postgresql driver - 42.2.5 - https://jdbc.postgresql.org/download/postgresql-42.2.5.jar Reproduced in both Windows and Linux

My conf file:


input {
    jdbc {
        jdbc_connection_string => "connectionString"
        jdbc_user => "user"
        jdbc_password => "password"
        jdbc_driver_library => "postgresql-42.2.5.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        connection_retry_attempts => "3"
        connection_retry_attempts_wait_time => "10"
        statement => "SELECT offer_item_id ||'-'|| TO_CHAR(pubofr_dhs_deb,'YYYYMMDDHHMISS') AS id, offer_item_id AS offeritemid, pubofr_dhs_deb AS validitybegindate, pubofr_dhs_end AS validityenddate FROM t_offer_item"
        jdbc_default_timezone => "UTC"
    }
}

filter {
    jdbc_streaming {
        jdbc_connection_string => "connectionString"
        jdbc_user => "user"
        jdbc_password => "password"
        jdbc_driver_library => "postgresql-42.2.5.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        statement => "SELECT offer_item_id AS relativeofferid, pubofr_dhs_deb as begindate FROM t_offer_item_assoc WHERE offer_item_id_assoc = :offerId AND pubofr_dhs_deb_assoc = :offerDate"
        parameters => {
            "offerId" => "offeritemid" 
            "offerDate" => "validitybegindate"
            }
        target => "[relativeOffers]"
    }

}

output {
    elasticsearch{
        index => "activeoffers_idx"
        action => "create"
        document_type => "activeoffers"
        document_id => "%{id}"
        hosts => "elasticsearchHost"
    }
}

The Error:


Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. 
{:pipeline_id=>"main", "exception"=>"Missing Converter handling for full class name=org.jruby.RubyObjectVar3, simple name=RubyObjectVar3", "backtrace"=>
["org.logstash.Valuefier.fallbackConvert(Valuefier.java:97)",
"org.logstash.Valuefier.convert(Valuefier.java:75)", 
"org.logstash.ConvertedMap$1.visit(ConvertedMap.java:35)", 
"org.logstash.ConvertedMap$1.visit(ConvertedMap.java:29)", 
"org.jruby.RubyHash.visitLimited(RubyHash.java:662)", "org.jruby.RubyHash.visitAll(RubyHash.java:647)", 
"org.logstash.ConvertedMap.newFromRubyHash(ConvertedMap.java:69)", 
"org.logstash.ConvertedMap.newFromRubyHash(ConvertedMap.java:64)", 
"org.logstash.Valuefier.lambda$initConverters$11(Valuefier.java:142)", 
"org.logstash.Valuefier.convert(Valuefier.java:73)", 
"org.logstash.ConvertedList.newFromRubyArray(ConvertedList.java:44)", 
"org.logstash.Valuefier.lambda$initConverters$15(Valuefier.java:154)", 
"org.logstash.Valuefier.fallbackConvert(Valuefier.java:94)", 
"org.logstash.Valuefier.convert(Valuefier.java:75)", 
"org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:99)", 
"org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen)", 
"org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:741)", 
"org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:145)"

How to reproduce:

If I use the property jdbc_default_timezone on jdbc-input and on the filter jdbc_streaming query i'm selecting a field of type "timestamp" like " pubofr_dhs_deb as begindate" i get the error. If I remove the jdbc_default_timezone or the timestamp field in the select of the query, events are processed successfully.

I opened this issue here after posting in discuss forum. https://discuss.elastic.co/t/problem-with-timestamp-and-timezone/148622

Best regards, Tiago Pinto

guyboertje commented 6 years ago

This is a weird one. Some observations:

  1. This looks like the jdbc_streaming filter that is causing the missing converter error not the jdbc input (unless you get a slightly different stack trace) becase a) it states Exception in pipelineworker and org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field - set field happens in the jdbc_streaming filter. If this happens in the jdbc input, it would be in JrubyEventExtLibrary$RubyEvent.initialize when the event is first created. 2: RubyObjectVar3 is a JRuby optimisation used in some case instead of a 4 element array - I can't find in our code, or the libraries we depend on, where a RubyObjectVar3 is created, I presume it must be JRuby itself choosing it instead of a 4 element array. 3: You may not need to use a timezone at all. The timezone setting should only be used when the database locale/timezone is set to a non-UTC value and the dates and timestamps are converted away from UTC in query results. I think the values are recorded internally as Unix epoch milliseconds.

We have not seen any issues with postgres timestamp datatypes. I looked at the Ruby on Rails ActiveRecord adapter code for JDBC as well. I can't see any specifically different mechanisms it uses to convert the Java SQL timestamp to Ruby compatible time instance (usually Time) when compared to the Sequel library that LS uses.

It is difficult to track down the source of this instance of a RubyObjectVar3 class. It certainly isn't one we are seeing in the vast majority of cases. We can modify the Valuefier class code but it will not be released until LS 6.4.4.

Can run without setting the jdbc timezone but with timestamps in the query?

guyboertje commented 6 years ago

@tiagopintof This is further complicated by the fact that under the hood, setting the jdbc_default_timezone sets it globally in the Sequel library - meaning that, as the jdbc_streaming filter uses the Sequel library also, it causes unwanted side effects in the jdbc_streaming filter.

We have plans to remove our dependence on the Sequel library by writing some common Java code to handle the JDBC layer directly, but as this is a large and reinventing-the-wheel effort we need a big block of time to do this.

tiagopintof commented 6 years ago

@guyboertje Hello, thank you for having a look at it.

"Can run without setting the jdbc timezone but with timestamps in the query?" Yes.

I managed to work around it, at first I was using a to_char function in the query so it would not break, then would convert it with a date filter then didn't even need the date filter if I set the mapping of the field as Date, Elastic would convert it.

About the locale/timezone, my database was set in "France/Paris" timezone that was the reason I needed the default timezone property.

Meanwhile database was converted to UTC so I'm no longer using the property and everything is working as expected.

Thank you for your time.

guyboertje commented 6 years ago

https://github.com/logstash-plugins/logstash-input-jdbc/issues/121#issuecomment-391540071

guyboertje commented 6 years ago

Although solved by the OP, I'm leaving this open for future readers.