Open prashanttct07 opened 9 years ago
what about the "How to select incremental data from a table?" section? Is that any good?
Hi Skundrik,
Sorry I didnt get you, can you explain in details
Prashant, if you have a look at the readme: https://github.com/jprante/elasticsearch-jdbc#how-to-select-incremental-data-from-a-table you will find an example.
Hi Micheee.
If I goto this line { "statement" : "select * from \"products\" where \"mytimestamp\" > ?", "parameter" : [ "$metrics.lastexecutionstart" ] }
So, mytimestamp should be there as fields in table which has timstamp of records being inserted.
But here i am rwading the data from view has fields like id, tag, description, name but does not have timestamp and also none of the column is there which has unique values.
So what could be the way forward in this case.
And along with this I need to run the same with scheduler right ?
Yes, I think you would need the scheduler. You might as well update your data ("How to update a table?") and add specific columns. I am also new to this project; so I am just adding my 5cents here, there might be better solutions.
Actually I am using view to index data, and view does not have functionality for a field which can me made as auto increment (as per various forum). So I am not able to add unique incremental fields in view?
Hi Prashant. Sorry I wasn't a bit clearer but micheee already pointed you in the right direction. The only way for most of the databases (kdb+, etc excluding) is to poll the data using query that will only return what changed since the last time you polled. So you have to come up with a query that allows you to do just that. Usually the easiest approach is to have a "last modified" column with a time-stamp. You can also have an auto-generated modification-id value that increases monotonously and the latest value is stored in a record's column every time a change is made.
Actually I am using view so in view we are using joins so no fields are coming which has unique value. That's where we are stuck to implement this and looking forward if the plugin has inbuilt mechanism to keep a track what all has been read and what not.
Hi , While Using the scheduler I am getting the exception as [21:03:00,814][ERROR][importer.jdbc.context.standard][pool-3-thread-1] java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0). java.io.IOException: java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0). at org.xbib.elasticsearch.jdbc.strategy.standard.StandardSource.fetch(StandardSource.java:617) ~[elasticsearch-jdbc-1.7.2.0-uberjar.jar:?] at org.xbib.elasticsearch.jdbc.strategy.standard.StandardContext.fetch(StandardContext.java:215) [elasticsearch-jdbc-1.7.2.0-uberjar.jar:?] at org.xbib.elasticsearch.jdbc.strategy.standard.StandardContext.execute(StandardContext.java:190) [elasticsearch-jdbc-1.7.2.0-uberjar.jar:?] at org.xbib.tools.JDBCImporter.process(JDBCImporter.java:118) [elasticsearch-jdbc-1.7.2.0-uberjar.jar:?] at org.xbib.tools.Importer.newRequest(Importer.java:241) [elasticsearch-jdbc-1.7.2.0-uberjar.jar:?] at org.xbib.tools.Importer.newRequest(Importer.java:57) [elasticsearch-jdbc-1.7.2.0-uberjar.jar:?] at org.xbib.pipeline.AbstractPipeline.call(AbstractPipeline.java:86) [elasticsearch-jdbc-1.7.2.0-uberjar.jar:?] at org.xbib.pipeline.AbstractPipeline.call(AbstractPipeline.java:17) [elasticsearch-jdbc-1.7.2.0-uberjar.jar:?] at java.util.concurrent.FutureTask.run(FutureTask.java:262) [?:1.7.0_79] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_79] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_79] at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
And my conf is :
echo '{ "type" : "jdbc", "jdbc" : { "url" : "jdbc:mysql://192.168.0.112:3306/esdb", "user" : "uname", "password" : "pwd", "sql" : { "statement" : "select id, description, tagname, traveldistance, latitude as \"location.lat\", longitude as \"location.lon\" from service;", "parameter" : [ "$metrics.lastexecutionstart" ] }, "schedule" : "0 0-59 0-23 ? * ", "statefile" : "statefile.json", "elasticsearch" : { "cluster" : "elasticsearch", "host" : "192.168.0.110", "port" : 9300 }, "index" : "myjdbc", "type" : "mytype", "index_settings" : { "index" : { "number_ofshards" : 5 } }, "metrics" : { "enabled" : true } } } ' | java \ -cp "/home/es/Prashant/elasticsearch-1.7.2/plugins/elasticsearch-jdbc-1.7.2.0/lib/" \ -Dlog4j.configurationFile=/home/es/Prashant/elasticsearch-1.7.2/plugins/elasticsearch-jdbc-1.7.2.0/bin/log4j2.xml \ org.xbib.tools.Runner \ org.xbib.tools.JDBCImporter
But how would you propose the plugin keep track of what was and wasn't processed? Even plugin would have to look at some ID or time-stamp to know it's done it before so it could be ignored.
Actually I also use flume-ng-sql (for SQL-flume-spark-elasticsearch) so this plugin has a feature where it monitors number of rows in DB, and check every time the last Rowth number read and then on next run it fetches the next row.
So is there something like that here as well?
Looking at flume-ng-sql-source it does indeed have functionality that allows it to track how many rows have been processed so far using Hibernate's setFirstResult(rowNumber) method. This of course would only work on tables that only get appended to and never modified or deleted.
Elastic JDBC doesn't seem to support this directly in the same sense. However there is a parameter called $metrics.totalrows that keeps track of the number of rows retrieved over multiple invokations. If your SQL dialect allows you to specify the starting row then you might be able to use this.
Incremental changes can only be synchronized on timestamps, like in this script (for Oracle but also valid for other DB) https://github.com/jprante/elasticsearch-jdbc/wiki/Oracle-schedule-example-script
$metrics.totalrows is just a steady counter to notify about activity, it is not designed to detect any changes.
I don't know how to track changes in a DB table / SQL result set via JDBC. There is no such support in JDBC, and also DBs do not support this via API. Most applications I know do this with custom extensions or add-ons like triggers, which are neither efficient nor portable nor present in JDBC.
Also note that incremental changes produce different kinds of deltas which do not always append new data, like in tail -f (in this very special case, streaming replication support, for following statements, would be a solution, but this is proprietary to many DBs, if available at all)
In an old version of JDBC river I tried to checksum result set fetches by myself to detect differences but that does not allow to find out if result sets have grown, shrunk, or existing values were just modified. So it was impossible to execute appropriate indexing/updating/deleting actions in ES. A naive approach would have to store the state of all doc IDs of a run, and that can be millions and millions.
i have very simmilar issue but with DB table containing application logs. In this case im collecting data series and at time of polling there doesnt have to be all logs in the table and when used metrics.lastexecution* i might to loose some data in the next run. Whag about possibility so JDBC importer to save the timestamp of last row fetched (actual value) and use it as parameter for next run? I have opened it here https://github.com/logstash-plugins/logstash-input-jdbc/issues/46
JDBC importer saves the last row fetched by default. It is available as rows.<columnname>
metrics.lastexecution
is only for metrics, it is not a value which is present in JDBC source.
OK, so if I have timestamp field within my query last row is saved and during next run i can use the value as parameter in where clause? If so what about formats (timestamp?, date ?) Any documenation to this, cause i havent found? Thanks - if it would be possible i will be very happy 😊
i was about to try it (incremental fetch of data based on actual values within the result of the query) and I want just to make sure that if i have parameter in my sql query in form "where timestamp > ?" and I want to have the parameter "timestamp" to be the value of last fetched row innlast run i use it like this:
"parameter" : [ "$rows.timestamp" ]
Am I right? thnx
"parameter" : [ "$rows.timestamp" ] not working :-( any suggestions please?
please help me :-) How to put sql parameter into where clause with latest field value from last run? thank you
please help me :-) How to put sql parameter into where clause with latest field value from last run? thank you same question
Hi Team,
I have a query like how we can fetch and index the data from MSsql to ES using this plugin jdbc importer and create the batch file.
I ran this bat file and the time when I ran DB was having 100 records. Now after 5 min there are more 20 entries into DB, so how to add /update the records in exiting index
Thanks, Rajesh
Is it possible that every time a scheduled run of jdbc importer takes place . The index in es is refreshed or flushed first and then re filled with entire dataset of sql?
please help me :-) How to put sql parameter into where clause with latest field value from last run?
Can any one please help me to how to add existing index in ES to add/update by DB SQL server -urgent
Hi Team,
I have a query like how we can fetch and index the data from mysql to ES using this plugin with below mentioned scenario.
I ran this plugin and the time when I ran DB was having 100 records. Now after 5 min there are more 20 entries onto DB, so how I can run this plugin which will fetch only 20 newly added records and index to ES.
I was going through with scheduler but that seems to re index complete table not the newly added.
So do let me know if there is a provision for the same in this plugin as i was not able to get the same from github docs.