jprante / elasticsearch-jdbc

JDBC importer for Elasticsearch
Apache License 2.0
2.84k stars 710 forks source link

data loss when elasticsearch data node can't be reached on time. #743

Open msimons opened 8 years ago

msimons commented 8 years ago

We are currently using the feeder 1.7 implementation and we are suffering data loss when the feeder is connected to a master node and no data node is reachable. If the data node can't be reached within the given timeout the feeder implementation will shuffle to next bulk request.

In the example down here nothing is guaranteed:

{
    "type" : "jdbc",
    "name" : "[UPDATE] [$ES_INDEX_TOC] Test feeder",
    "jdbc" : {
        "elasticsearch.cluster" : "$ES_CLUSTERNAME",
        "elasticsearch.host" : "$ES_HOST:$ES_PORT",
        "strategy" : "simple",
        "interval" : "5s",
        "max_bulk_actions" : 500,
        "autocommit" : true,
        "max_concurrent_bulk_requests" : 1,
        "threadpoolsize" : 1,
        "url" : "$JDBC_URL",
        "user" : "$JDBC_USER",
        "password" : "$JDBC_PASSWORD",
        "sql" :  [
            {
                "statement" : "select * from \"test_feeder\" ORDER BY \"_job\"",
            },
            {
                 "statement" : "delete from \"test_feeder\" where \"_job\" = ?",
            }
         ]
    }
}

Earlier implementations of the feeder/river-jdbc had support for data acknowledgement to handle this case. I recently build support for data acknowledgement in my own fork for 1.7 in this way:

{
    "type" : "jdbc",
    "name" : "[UPDATE] [$ES_INDEX_TOC] Test feeder",
    "jdbc" : {
        "elasticsearch.cluster" : "$ES_CLUSTERNAME",
        "elasticsearch.host" : "$ES_HOST:$ES_PORT",
        "strategy" : "simple",
        "interval" : "5s",
        "max_bulk_actions" : 500,
        "autocommit" : true,
        "max_concurrent_bulk_requests" : 1,
        "threadpoolsize" : 1,
        "url" : "$JDBC_URL",
        "user" : "$JDBC_USER",
        "password" : "$JDBC_PASSWORD",
        "sql" :  [
            {
                "statement" : "select * from \"test_feeder\" ORDER BY \"_job\"",
                "acknowledge" : true,
                "acknowledge-full-sql" :
                {
                    "statement" : "delete from \"test_feeder\" where \"_job\" >= ? and \"_job\" <= ?",
                    "parameter" : ["$job_min","$job_max"]
                },
                "acknowledge-single-sql" :
                {
                    "statement" : "delete from \"test_feeder\" where \"_job\" = ?",
                    "parameter" : ["$job"]
                }
            }
         ]
    }
}

'acknowledge-full-sql' delete statement will be used when all documents within the select are succeeded.

'acknowledge-single-sgl' delete statement will be used for all succeeded documents when some documents are failed. The failed documents will be catched by the following select. This implementation isn't perfect yet but it will catch enough for now.

I quickly scanned the latest implementation of the feeder and i didn't notice any support for data acknowledgement. The ingest implementation has also undergone some heavy changes so i don't think i can reuse my current implementation.

We can't use the feeder implementation without a guarantee delivery mechanism.

jprante commented 8 years ago

Thanks for pointing out deficiencies of JDBC importer.

Yes, acknowledge: true has gone a long time ago. It was more a hack than a clean design. The idea was ok but the implementation was far from good.

I plan to add event notifications, so SQL statements can be executed in case of success/error/exception in a clean way. SQL statements could hook to certain events such as "before", "after", "on_success", "on_error", "on_failure", so it would be possible to register the state of ES indexing at DB side. Alternatively, log4j2 statements will be possible to execute on events.

A more radical plan would be to use Groovy scripts for importing data and event notifications, including JDBC. But that would be a drastic change to JDBC importer and even a new project. These scripts could have any logic implemented to handle data or events, beside getting it indexed into ES.

Another point is failover, or throttling. I will add a line to the code so the JDBC importer waits for yellow health before fetch, in the hope this is sufficient.

msimons commented 8 years ago

Jorg, the plan to add event notifications sounds good. In which timeframe can you implement it? If you need some assistent with the implementation: i'm here to help! :-)