jprante / elasticsearch-jdbc

JDBC importer for Elasticsearch
Apache License 2.0
2.84k stars 710 forks source link

river.state.last.active_begin not working as expected #494

Open soacom opened 9 years ago

soacom commented 9 years ago

I'm trying to build an incremental index by "tailing" a table, but what seems to be happening is that the river is using the current time for the river.state.last.active_begin variable, rather than the time the river last ran. I have the following config:

{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://192.168.59.3:3306/xxx",
        "user" : "xxx",
        "password" : "xxx",
        "sql" : [
            {
                "statement" : "select * from XXX where REQUESTDTS > ?",
                "parameter" : [ "$river.state.last_active_begin" ]
            }
        ],
        "index" : "xxx_index",
        "type" : "xxx_type",
        "schedule" : "0 0-59 0-23 ? * *"
    }
}

But viewing my mysql-query.log I can see that the ? is being replaced by the current timestamp, not the time the river last ran. This of course means that it's looking for data that hasn't been added yet, and isn't seeing any data that was added in the last interval.

Am I configuring this wrong?

sandesh2014 commented 9 years ago

Even I'm facing the same issue. river.state.last.active_begin is not working as expected.

sandesh2014 commented 9 years ago

Team, can someone help out on this.

sandesh2014 commented 9 years ago

@jprante could you help us out on this issue. As all the scheduled rivers are not working hence resulting in not getting new data into Elasticsearch. Do suggest us if there is any alternative to overcome this issue. Your early help is highly appreciated.

soacom commented 9 years ago

The schedules are working, i.e. the river runs on the scheduled intervals. The problem is that it doesn't pull new data. The other issue I had is that I was hoping that river.state.last.active_begin would be zero on first execution of a new river, so that it would get all data up to that point, and then start importing incremental data. It seems to always be the current timestamp, which obviously will never work.

sandesh2014 commented 9 years ago

Yes schedules are working. But the problem is only with river.state.last.active_begin, as we get the data which is greater than the last river began which is infact not working. Hence new data is not being synced to Elasticsearch.

Hoping anyone from team would provide a solution / alternative for this.

jprante commented 9 years ago

The river.state variable issue will be fixed in the next days. This is a spare time project, please understand the time I can spend on issues is very limited. Patches, bugfixes, etc. are warmly welcome.

soacom commented 9 years ago

I think the issue is that in scr/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverFlow.java at line 193 the code:

    riverContext.setRiverState(riverState.setCounter(counter)
                .setLastActive(currentTime != null ? currentTime : new DateTime(0), null)
                .setCurrentActive(new DateTime()));

is always going to set the lastActive to the currentTime in the beforeFetch. It seems like this should be in the afterFetch function, and that beforeFetch should maybe be using riverContext.getRiverState().getLastActiveBegin() instead of riverContext.getRiverState().getCurrentActiveBegin(). I don't see currentTime used anywhere else in the beforeFetch function, so you can probably just replace this, but I may be missing something here. I am not confident enough in my understanding of the way the river works to try and make the changes myself, sorry about that.

Maybe try replacing this:

    DateTime currentTime = riverContext.getRiverState().getCurrentActiveBegin();
        riverContext.setRiverState(riverState.setCounter(counter)
                .setLastActive(currentTime != null ? currentTime : new DateTime(0), null)
                .setCurrentActive(new DateTime()));

with this:

    DateTime lastActiveBeginTime = riverContext.getRiverState().getLastActiveBegin();
        riverContext.setRiverState(riverState.setCounter(counter)
                .setLastActive(lastActiveBeginTime != null ? lastActiveBeginTime : new DateTime(0), null)
                .setCurrentActive(new DateTime()));
jprante commented 9 years ago

This is new code from a contribution which is unreleased yet https://github.com/jprante/elasticsearch-river-jdbc/commit/9a2fdba56db361d0ae29ce56cfdfdb9d8abc30dc and where I hope this fixes some issues. I'm not sure it works in all cases. More eyeballs on the code are welcome too :)

soacom commented 9 years ago

I don't think the code in that contribution will work. It looks like it will always set the lastActive to the current time in the beforeFetch.

soacom commented 9 years ago

I tested this out locally. With the changes I suggested above it seems to work as expected. One thing to note is that each time the river starts it will go back to the beginning of time and get everything in the table, the riverState lastActiveBegin is does not appear to be persisted anywhere, this could be a problem for users with lots of older data.

StevenLeRoux commented 9 years ago

@soacom The currentTime is maybe misunderstood here. currentTime is the time got from the last currentActiveBegin state, not the current time. We could rename it to be more explicit. (s/currentTime/currentActive/ for example).

See #489 , which is working for me.

CurrentActiveBegin is an intermediate state just made to initialize the LastActiveBegin that is set in the afterFetch too (I don't get why btw).

So basically we have :

1st run : currentActiveBegin = null => last_active_begin set to 0 which implies a first dump of the table. currentActiveBegin set to new DateTime()

next run: currentActiveBegin not null => last_active_begin set to the last currentActiveBegin currentActiveBegin set to new DateTime()

etc...

ad5248 commented 8 years ago

@jprante we can use this

{ "type": "jdbc", "jdbc": { "driver": "com.mysql.jdbc.Driver", "url": "jdbc:mysql://localhost:3306/myspringjdbcdb", "user": "root", "password": "root", "sql": [ { "statement": "SELECT id AS _id ,comments.* FROM comments where mytimestamp >= DATE_SUB(?, INTERVAL 5 SECOND)", "parameter": [ "$river.state.last_active_begin" ] } ], "index": "profile", "type": "comments", "bulk_size": 100, "max_bulk_requests": 30, "bulk_timeout": "10s", "flush_interval": "5s", "schedule": "0/5 0-59 0-23 ? * *" } }

"statement": "SELECT id AS _id ,comments.* FROM comments where mytimestamp >= DATE_SUB(?, INTERVAL 5 SECOND)",

this 5 is the schedule interval depend on "schedule": "0/5 0-59 0-23 ? * *"