elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
78 stars 3.5k forks source link

JDBC/SQL Plugin Proposal #3429

Closed talevy closed 9 years ago

talevy commented 9 years ago

Link to Plugin: here

Purpose of the issue

Understand any features or databases to provide support for with the jdbc plugin for logstash as the community jdbc-river becomes deprecated.


What is the purpose of logstash-input-jdbc?

  1. Generic plugin for supporting all databases as input which have jdbc connectors. To be leveraged for creating specific plugins for databases (for example: logstash-input-mysql
  2. Integration point between Logstash and popular databases like MySQL, Postgres, Oracle, Mongodb

    What does logstash-input-jdbc do?

It runs a sql query and generates input events for each resulting row. The option to schedule the query is also available.

Important Fields

  1. statement
    • Any valid SQL statement will work.
  2. parameters
    • You are able to specify parameter values for use in the sql statement using the :<fieldname> reference syntax
  3. schedule
    • a valid cron schedule is accepted here for when to execute the statement.

      Special Parameters

For now the only special parameter is ::sql_last_start which is the time in which the last query was executed. This can be used within the statement to limit the rows being processed to only new rows as queries are executed under their specified schedule.

How to Use

Installation

bin/plugin install logstash-input-jdbc

I will show how this is used with MySQL.

download jdbc driver library from here then point to library in logstash config

# logstash config
input {
    jdbc {
      "jdbc_driver_class" => "com.mysql.jdbc.Driver"
      "jdbc_driver_library" => "/path/to/mysql_jdbc_connector.jar"
      "jdbc_connection_string" => "jdbc:mysql://localhost:3306/foodb"
      "jdbc_user" => "myuser"
      "jdbc_password" => "mypassword"
      "statement" => "select * from foo where created_at > :sql_last_time
      "parameters" => { hello => 4}
      "schedule" => "* * * * *"
    }
}

For many of the popular databases, we can write wrapper plugins on top of logstash-input-jdbc as was done for logstash-input-mysql, which would make configuration more straightforward.

e.g.

# mysql logstash config
input {
  mysql {
      host => localhost,
      port => 3306,
      database => "testdb"
      user => "logstash"
      password => "logstash"
      statement => "SELECT * FROM foo where created_at >= :sql_last_start"
  }
}
talevy commented 9 years ago

in response to #2476

suyograo commented 9 years ago

@dadoonet any comments?

morgango commented 9 years ago
  1. The SQL statement will be large and typically multi-line, which could make this config unwieldy. It would be good to have an option to read the SQL from a file, similar to what is done with the translate plugin.
  2. An advanced feature might be to have an SQL filter that would execute based on an event and is able to use the event values as part of the connection string. I have seen this used a lot with ETL tools like Ab Initio.
  3. Having the current date available as a parameter would be useful also.
  4. This scheduling is CRON-centric, but watcher uses a different style. It seems inconsistent, I would rather see it use the watcher style. It doesn't make sense to me to have an SQL script running in an event steam that would need the full granularity of CRON, except to avoid having to keep state information.
robmckeown commented 9 years ago

take a look at logstash-input-genjdbc - similar. It would be good to consolidate these various JDBC/SQL approaches into one plugin, and keep it simple. There are no ends of interesting extensions here, but in my case at least, I just need basic SQL query execution on a schedule/with state(high-water marks) maintained.

jrgns commented 9 years ago

I've been thinking about a SQL plugin for a while, and thought that a Sequel (http://sequel.jeremyevans.net/) plugin would be the best way to ensure a simple implementation that covers a lot of different use cases. By using one parameter (database_uri) you can specify everything from the database type, adapter, username and password in one line. See http://sequel.jeremyevans.net/rdoc/files/doc/opening_databases_rdoc.html for more about the URI.

nellicus commented 9 years ago

@talevy @suyograo

1)beside the timestamp watermark :sql_last_time it would be great to have a ID based watermark e.g. :sql_last_id , it is quite common to have tables where no timestamp is specified or it is but it doesn't indicate a sequentiality of the inserted events with relation to previous ones. so keeping the last_id of last_timestamp used by the query, ideally persist it always to file and also be able to specify an arbitrary value to perform re-indexing would be handy.

2)regarding the ::sql_last_start "watermark" as I read it would contain the last time a query was run, I think this should be actually the last watermark used where returned resultset contained more than 0 events? otherwise I see potentiality for losing events.

3)last, it'd be nice to auto-tune the amount the events the plugin will query at each time or perhaps the frequency of the query? perhaps based on last response time by the database && # of events returned? I imagine people specifying all the possible heavy queries and a fixed time a-la cron/watcher and scenarios with peaks of events where the plugin might start to lag behind?

elvarb commented 9 years ago

For reference here are the JDBC plugins I know of that are in the works

https://github.com/logstash-plugins/logstash-input-jdbc https://github.com/IBM-ITOAdev/logstash-input-genjdbc

And I agree, having two or more for the same job is not ideal at the moment.

One feature that I would love to have in the end is some way to monitor its status in some way. What happens if an error comes up in the pipeline, if you cant connect to the database, if the query results in an error, how many records were found, what if the target mapping in ealsticsearch is all wrong and so on.

tomrade commented 9 years ago

Would be nice to be able to select the database ID/key to use as the ES Doc ID , you can likely do this with other filters (ruby comes to mind) but would be useful if it was in one place

talevy commented 9 years ago

@morgango filter exists here: https://github.com/wiibaa/logstash-filter-jdbc

talevy commented 9 years ago

@nellicus maybe we can store the field values of the last processed row in the sql response? I suppose this would only be useful for when the sql queries have explicit ordering of the response.

nellicus commented 9 years ago

@talevy sorry is this re point 1) 2) or 3)? which field are are you referring to? the watermarks? if so what happens when you shut it down? how does it know from which ID to resume from?

morgango commented 9 years ago

@talevy, this is a tough thing to do in SQL for any engine, because not every record will have a unique id in a well-defined format (every vendor has their own way of dong it). Also, it is entirely possible to have duplicate records in a database.

A user could have a single ID field that we could store, but there is no guarantee that it is an incrementing integer or anything easy, it could even be a key (multiple fields). They could have a date or a datetime, but we could get a situation where records are added with a date that is prior to the last date, so we could miss records.

When I have built my own ETL processes with SQL databases, the best way I have seen to approach the problem is to":

  1. Have each run of the process use a specific date or datetime parameter, usually either the current date/time or yesterdays date/time.
  2. Allow a manual run where the date can be easily overridden.
  3. Delete the existing records from the database that were previous loaded.
  4. Load the new data.
  5. Increment the run date/time so the process can be run again.

The challenge is that is a really "batch" way of dong things and each of these assumes a lot of knowledge about both the source and target systems.

Personally, I think it would make the most sense for us to approach it in the following way:

The whole "sincedb" concept is really nice, and very advanced compared to what is typically done with batch ETL. If we can try to get close to this functionality SQL people will be happy.

On Fri, Jun 19, 2015 at 12:35 PM, Tal Levy notifications@github.com wrote:

@nellicus https://github.com/nellicus maybe we can store the field values of the last processed row in the sql response? I suppose this would only be useful for when the sql queries have explicit ordering of the response.

— Reply to this email directly or view it on GitHub https://github.com/elastic/logstash/issues/3429#issuecomment-113583397.

Morgan Goeller, Elastic Solutions Architect

P: +1.512.709.9489

L: http://www.linkedin.com/in/morgango

rbastian commented 9 years ago

One thing that jumps out at me here is that to do a bulk load of table/view data into Elasticsearch the contents of the entire query would be pulled in "in one go". For a very large table, say, 500M records will logstash pull that data in by chunk size and can it manage the heap space necessary to hold that much data?

talevy commented 9 years ago

@rbastian agreed. I plan on pushing out a paging version any minute now to alleviate this issue of really large queries

talevy commented 9 years ago

@nellicus @morgango for the first release, I think I will keep the watermarking of latest fields out. what are your thoughts? Watermarking being the recording of (min,max) values of field values. I played around with the idea because I really like being about to incrementally fetch things based on new id values. This will require separate filtering on top of the original sql statement because these fields are not set before the first query.

nellicus commented 9 years ago

@talevy does that include timestamped events too like the below?

statement => "SELECT * FROM foo where created_at >= :sql_last_start"

just to be clear.. the way I see this for ID based would be

statement => "SELECT * FROM foo where last_id >= :sql_last_id"

just wondering if leaving ID based out of first release, is the plugin gonna deal with tables where no timestamp is present? e.g. how to avoid duplicate events?

morgango commented 9 years ago

@talevy, I like that feature, I would make sure that you can link it not just to an ID but to a key (especially the primary key). Not all relational tables have a specific ID, it might be a combination of things.

On Tue, Jul 7, 2015 at 3:31 AM Antonio Bonuccelli notifications@github.com wrote:

@talevy https://github.com/talevy does that include timestamped events too like the below?

statement => "SELECT * FROM foo where created_at >= :sql_last_start"

just to be clear.. the way I see this for ID based would be

statement => "SELECT * FROM foo where last_id >= :sql_last_id"

just wondering if leaving ID based out of first release, is the plugin gonna deal with tables where no timestamp is present? e.g. how to avoid duplicate events?

— Reply to this email directly or view it on GitHub https://github.com/elastic/logstash/issues/3429#issuecomment-119119354.

nellicus commented 9 years ago

@morgango how do you enstablish sequentiality from a combination of fields?

implementation of jdbc connector in standard SIEMs does follow ID or timestamp, a combination of fields can be used to avoid losing events say if you have two events with same timestamp.

talevy commented 9 years ago

Version 1.0 Released here!

bin/plugin install logstash-input-jdbc

usage example:

input {
   jdbc {
     jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
     jdbc_driver_class => "com.mysql.jdbc.Driver"
     jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
     jdbc_user => "mysql"
     parameters => { "favorite_artist" => "Beethoven" }
     schedule => "* * * * *"
     statement => "SELECT * from songs where artist = :favorite_artist"
   }
 }
rajivmca2004 commented 9 years ago

Anyone know , how we can avoid duplicate insert from Oracle using jdbc plugin to MongoDB or any output plugin? Whenever we are trying to sync in 1 min interval, it also sync all data rather partial update.

talevy commented 9 years ago

@rajivmca2004

partial retrievals must be managed from within the sql statement, since there is no inherent ordering of records in all relational tables.

if your data is ordered with a timestamp field, you can try applying a filtering clause in your SQL statement using the :sql_last_start plugin field representing the last time your query has been run.

rajivmca2004 commented 9 years ago

Thanks @talevy . It works.

ChandanPR commented 9 years ago

Is there a way we could do type mapping as in jdbc river?

Using logstash-input-jdbc and logstash-output-elasticsearch plugins I am populating the data from mysql database to elastic search.

In the select query I have a column, which is of data type "datetime".

When the data is populated into elastic search I see the data type as string instead of timestamp.

I did go through the documentation, but couldn't find information about how to set the timestamp column. (If I remember correctly, with the jdbc river this data was populated as timestamp.)

For e.g. a configuration like below: jdbcTimeField => 'TimeStamp' # The column in your result table that contains a timestamp

is there any alternative approach/configuration which I can use?

Regards, Chandan

rajivmca2004 commented 9 years ago

Hi Chandan,

You can use filters to change data type to Date. I would suggest to convert datetime to Long milliseconds.

https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html

markbaumgarten commented 9 years ago

Like others in this thread I wanted a way to load data from tables using an auto increment column, so I forked the logstash-jdbc-input plugin to my own repo here https://github.com/markbaumgarten/logstash-input-jdbc. Not sure if this is the right way to go about contributing to the logstash-jdbc-input project, but I´m hoping this might be useful for someone other than me.

purbon commented 8 years ago

@markbaumgarten why not also creating a PR to add this feature here? this would be an awesome addition.

markbaumgarten commented 8 years ago

Well, I think my version of the logstash-input-jdbc plugin is a bit more specific: My plugin does not revisit/reindex each row upon execution in order to synchronize rows. It only "tails" a table and assumes that rows are not changed over time(only appended) and tries to handle this specific case in the most efficient way.

My plugin also assumes that the input is an sql table, and the output is an elasticsearch index and as such it is a combination of both input and output.

Perhaps renaming my plugin to "input-jdbc-sqltail-to-es" would make sense....?

purbon commented 8 years ago

@markbaumgarten renaming to logstash-input-sqltail make sense to me.

pranmitt commented 8 years ago

Is there any way we can apply logic over that data(means massage the sql output) before indexing into elasticsearch?

tomrade commented 8 years ago

I guess you can do anything with the data as you can normally do, via logstash plugins at the filter stage , for example

Input Sql data in

Filter Your parsing here

Output Index to elasticsearch

On 20 Sep 2016 1:19 p.m., "pranmitt" notifications@github.com wrote:

Is there any way we can apply logic over that data(means massage the sql output) before indexing into elasticsearch?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/elastic/logstash/issues/3429#issuecomment-248284746, or mute the thread https://github.com/notifications/unsubscribe-auth/AEihU9zIRgWqQ2m2IOu1qu1qFFC4LsKrks5qr89bgaJpZM4FABlz .

pranmitt commented 8 years ago

@tomrade ..filters are limited by functionality, what i meant was to execute some application code (similar to execution of callback function)

tomrade commented 8 years ago

If you cannot find a filter to do the manipulation you want, you could use the Ruby filter and put any ruby code in there.

On 20 Sep 2016 3:45 p.m., "pranmitt" notifications@github.com wrote:

@tomrade https://github.com/tomrade ..filters are limited by functionality, what i meant was to execute some application (similar to execution of callback function)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/elastic/logstash/issues/3429#issuecomment-248323037, or mute the thread https://github.com/notifications/unsubscribe-auth/AEihUyztrlfoGgJvi7OPD5DFvse-r76xks5qr_F7gaJpZM4FABlz .

adarshgit commented 7 years ago

@rajivmca2004 : Did the :sql_last_start worked for you because when i tried to use it am getting an syntax error. could you please guide how to use that plugin.

apotek commented 6 years ago

Did the :sql_last_start worked for you because when i tried to use it am getting an syntax error.

I also get a syntax error on this.

adarshgit commented 6 years ago

@apotek The actual usage is :sql_last_value not :sql_last_start. You need to specify the tracking column details in the JDBC config files, please refer the below link which has example of final working JDBC connection. https://discuss.elastic.co/t/how-should-i-use-sql-last-value-in-logstash/64595/7

Let say if you want to do it using the timestamp column, then no need to mention the tracking column, just make sure record_last_run => "true" is set and the last execution time will be captured in last_run_metadata_path => "/opt/logstash/lastrun/.logstash_jdbc_last_run" which will be referred int he nextrun when you specify :sql_last_value

apotek commented 6 years ago

@adarshgit Thank you for your response. After posting, I did dig around long enough to figure out that :sql_last_start, while frequently appearing in docs and discussions around the web, is not the correct parameter to use.

For anyone else who is struggling with how to use :sql_last_value against multiple unix second epoch timestamps, here's what I ended up doing ( probably mysql specific):

SELECT * FROM mytable
  WHERE created > FLOOR(UNIX_TIMESTAMP(STR_TO_DATE(:sql_last_value, '%Y-%m-%d %T.%f000 Z'))) 
  OR access > FLOOR(UNIX_TIMESTAMP(STR_TO_DATE(:sql_last_value, '%Y-%m-%d %T.%f000 Z')))
  OR login > FLOOR(UNIX_TIMESTAMP(STR_TO_DATE(:sql_last_value, '%Y-%m-%d %T.%f000 Z')));

This converts the logstash/jdbc native sql_last_value date (stored as YYYY-mm-dd HH:MM:SS.000000000 Z) into a unix seconds epoch that you can then compare against your unix timestamp columns in your database.

Hope that helps anyone else who was struggling with comparing multiple unix timestamp columns against one logstash/jdbc plugin native timestamp.