jprante / elasticsearch-knapsack

Knapsack plugin is an import/export tool for Elasticsearch
Apache License 2.0
472 stars 77 forks source link

Import: bulk error, NoNodeAvailableException and missing content #49

Open rtkjbillo opened 10 years ago

rtkjbillo commented 10 years ago

Using ElasticSearch 1.1, we are seeing a problem with the import mechanism and this plugin throwing stacktraces and not successfully inserting all data from the output .tar.gz file.

At the end of the bulk import process, ElasticSearch throws NoNodeAvailableException(s) corresponding to the maxBulkConcurrency setting. The number of records missing in the destination ElasticSearch instance appears to fall below (maxBulkConcurrency * maxActionsPerBulkRequest), so on a 4-threaded configuration with 1000 actions, we could see up to 4000 missing entries.

Stacktrace:

[2014-06-12 20:51:47,637][INFO ][BulkTransportClient      ] closing bulk processor...
[2014-06-12 20:51:47,685][INFO ][BulkTransportClient      ] shutting down...
[2014-06-12 20:51:47,722][ERROR][BulkTransportClient      ] bulk [5626] error
org.elasticsearch.client.transport.NoNodeAvailableException: No node available
    at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:263)
    at org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:89)
    at org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:316)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
[2014-06-12 20:51:47,722][ERROR][BulkTransportClient      ] bulk [5628] error
org.elasticsearch.client.transport.NoNodeAvailableException: No node available
    at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:263)
    at org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:89)
    at org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:316)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
[2014-06-12 20:51:47,732][ERROR][BulkTransportClient      ] bulk [5625] error
org.elasticsearch.client.transport.NoNodeAvailableException: No node available
    at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:263)
    at org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:89)
    at org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:316)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
[2014-06-12 20:51:47,746][ERROR][BulkTransportClient      ] bulk [5627] error
org.elasticsearch.client.transport.NoNodeAvailableException: No node available
    at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:263)
    at org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:89)
    at org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:316)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
[2014-06-12 20:51:47,747][INFO ][BulkTransportClient      ] shutting down completed

Tracking this down through the source for elasticsearch-knapsack and elasticsearch-support, I believe we are running into a "failure to flush" condition when closing out the import, where not all records are successfully written before each thread is terminated.

In src/main/java/org/xbib/elasticsearch/action/RestImportAction.java, after the logger.info("end of import: {}", status); call, a method call is made to bulkClient.shutdown();. This in turn calls BulkTransportClient's super.shutdown(); method, which runs the following code in BaseTransportClient:

    public synchronized void shutdown() {
        if (client != null) {
            client.close();
            client.threadPool().shutdown();
            client = null;
        }
        addresses.clear();
    }

At no point does the plugin or support mechanism appear to wait. My suggestion would be to add a line in RestImportAction.java to perform bulkClient.flush() prior to the shutdown call, or update the elasticsearch-support plugin to ensure that a shutdown waits for pending actions to complete.

jprante commented 10 years ago

Thanks for report, your analysis is correct.

I bumped elasticsearch-support version to 1.2.1.0 which comes with an extra waitForResponses() method to prevent premature closes of BulkProcessor.

rtkjbillo commented 10 years ago

Thanks for the quick fix! I see that http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-support/1.2.1.0/elasticsearch-support-1.2.1.0.zip is now available. Is a compiled 1.2.1 version of elasticsearch-knapsack forthcoming as well so that we can install with the plugin command?

rtkjbillo commented 10 years ago

We ended up patching this further in elasticsearch-support by waiting for all threads to complete for a maximum of 60 seconds, along with some other changes to elasticsearch-knapsack that were useful for our purposes: