jprante / elasticsearch-jdbc

JDBC importer for Elasticsearch
Apache License 2.0
2.84k stars 709 forks source link

Incremental inserts #365

Open ayyagari opened 10 years ago

ayyagari commented 10 years ago

I would like to use river in feeder mode (pushing the changes to elasticsearch) by observing inserts into a database table. Here is the relevant configuration section from my feeder script: "jdbc" : { ... "url" : "jdbc:hsqldb:hsql://localhost:9001/xdb", "sql" : [ { "statement" : "select * from public.observed_table where timestamp > ?", "parameter" : [ "$river.state.last_active_begin" ] } ], ... }

I see multiple issues with this approach:

  1. select statement has to rely on a timestamp column that may not be there always or may not be storing the timestamp information in UTC format. How do we handle that?
  2. If the feeder has to be restarted again, what would be the value for last_active_begin it uses?
  3. I wish there is a different strategy it uses to detect inserts: for example a custom strategy that checks table id column last synced or may be a timestamp in non UTC format, and stores that information in elasticsearch so that when we restart feeder, we know exactly where it would read from.

Any insights/suggestions are greatly appreciated!

-Madhav

ayyagari commented 10 years ago

FWIW, I finally ended up using feeder script like the below to sync up data. The key thing was to figure out what "_id" would be (it should be one or more column values taken together to form a unique key). "schedule" added below is to run feeder every 30 seconds.

{
"elasticsearch" : {
"cluster" : "elk-cluster",
"host" : elkhost.company.com" },
"type" : "jdbc",
"schedule" : "0/30 0-59 0-23 ? * *", "jdbc" : {
"url" : "jdbc:hsqldb:hsql://localhost:9001/xdb", "driver" : "org.hsqldb.jdbc.JDBCDriver", "user" : "sa",
"password" : "",
"sql" : "select column1+column3 as \"_id\", column1, column2, column3, column4 from public.observed_table", "strategy" : "simple", "autocommit" : true, "index" : "myindex", "type" : "mytype", "versioning" : "true" } }

BigBrain-Industries commented 9 years ago

You could've done something like this: { "statement" : "select * from public.observed_table where (? = 1) or (? > 1 and timestamp > ?)", "parameter" : [ "$river.state.counter ","$river.state.counter ","$river.state.last_active_begin" ] }