jprante / elasticsearch-jdbc

JDBC importer for Elasticsearch
Apache License 2.0
2.84k stars 709 forks source link

$metrics.lastexecutionstart is null #790

Open jackieit opened 8 years ago

jackieit commented 8 years ago

importer version 2.2.0.1 and elasticSearch Version 2.2.1 Hi, I used this json file

{

"type" : "JDBC",
"jdbc" : {
    "strategy" : "standard",
    "locale" : "zh_CN",
    "timezone": "Asia/Shanghai",
    "url" : "jdbc:mysql://127.0.0.1:3306/pinpai361",
    "user" : "root",
    "password" : "",
    "sql" : [
        {
            "statement":"select  *,item_id as _id, from \"pp_goods\" where \"updated_at\" > ? ",
             "parameter" : ["$metrics.lastexecutionstart"]
        }
    ],
    "index" : "goods",
    "type" : "wfhao",
    "elasticsearch":
    {
            "cluster":"wfhao-app",
            "host":"127.0.0.1",
            "port":9300
    }

}

}

but when i see the log it shows com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'from "pp_goods" where "updated_at" > null' at line 1 java.io.IOException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'from "pp_goods" where "updated_at" > null' at line 1 at org.xbib.elasticsearch.jdbc.strategy.standard.StandardSource.fetch(StandardSource.java:631) ~[elasticsearch-jdbc-2.2.0.0-uberjar.jar:?] at org.xbib.elasticsearch.jdbc.strategy.standard.StandardContext.fetch(StandardContext.java:188) [elasticsearch-jdbc-2.2.0.0-uberjar.jar:?] at org.xbib.elasticsearch.jdbc.strategy.standard.StandardContext.execute(StandardContext.java:163) [elasticsearch-jdbc-2.2.0.0-uberjar.jar:?] at org.xbib.tools.JDBCImporter.process(JDBCImporter.java:179) [elasticsearch-jdbc-2.2.0.0-uberjar.jar:?] at org.xbib.tools.JDBCImporter.newRequest(JDBCImporter.java:165) [elasticsearch-jdbc-2.2.0.0-uberjar.jar:?] at org.xbib.tools.JDBCImporter.newRequest(JDBCImporter.java:51) [elasticsearch-jdbc-2.2.0.0-uberjar.jar:?] at org.xbib.pipeline.AbstractPipeline.call(AbstractPipeline.java:50) [elasticsearch-jdbc-2.2.0.0-uberjar.jar:?] at org.xbib.pipeline.AbstractPipeline.call(AbstractPipeline.java:16) [elasticsearch-jdbc-2.2.0.0-uberjar.jar:?] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_77] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_77] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_77] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]

I know I get the $metrics.lastexecutionstart null value

dengshilong commented 8 years ago

You must have the statefile.json like this "statefile" : "statefile.json".

#!/bin/sh

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=${DIR}/../bin
lib=${DIR}/../lib

echo '
{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/blog",
        "statefile" : "statefile.json",
        "user" : "blog",
        "password" : "12345678",
        "sql" : [{
                "statement": "select id as _id, post_title as title, post_content as content from wp_posts where post_status = ? and post_modified > ? ",
                "parameter": ["publish", "$metrics.lastexecutionstart"]}
            ],
        "index" : "customer",
        "type" : "blog",
        "metrics": {
            "enabled" : true
        },
        "elasticsearch" : {
             "cluster" : "elasticsearch",
             "host" : "localhost",
             "port" : 9300 
        }   
    }
}
' | java \
    -cp "${lib}/*" \
    -Dlog4j.configurationFile=${bin}/log4j2.xml \
    org.xbib.tools.Runner \
    org.xbib.tools.JDBCImporter

The first time you run the script, it will generate the statefile.json file, like this

{
  "type" : "jdbc",
  "jdbc" : { 
    "password" : "12345678",
    "elasticsearch" : { 
      "host" : "localhost",
      "cluster" : "elasticsearch",
      "port" : "9300"
    },  
    "index" : "customer",
    "statefile" : "statefile.json",
    "metrics" : { 
      "lastexecutionstart" : "2016-03-27T06:20:37.775Z",
      "lastexecutionend" : "2016-03-27T06:20:38.142Z",
      "counter" : "2",
      "enabled" : "true"
    },  
    "type" : "blog",
    "user" : "blog",
    "url" : "jdbc:mysql://localhost:3306/blog",
    "sql" : [ { 
      "statement" : "select id as _id, post_title as title, post_content as content from wp_posts where post_status = ? and post_modified > ? ",
      "parameter" : [ "publish", "$metrics.lastexecutionstart" ]
    } ] 
  }
}

Later, the script can read $metrics.lastexecutionstart from statefile.json.

jackieit commented 8 years ago

Thanks ,I have a try.

dengshilong commented 8 years ago

$metrics.lastexecutionend must change to $metrics.lastexecutionstart to avoid lost data.

chsnake commented 8 years ago

Did you realize of implementing incremental index? what the type of "updated_at"?

dengshilong commented 8 years ago
Antoniossss commented 8 years ago

This is already addressed by 0f6b18069df124d0e0e8a859b24f7d3c989870f1 and this branch has been merged into master.