jprante / elasticsearch-jdbc

JDBC importer for Elasticsearch
Apache License 2.0
2.84k stars 709 forks source link

Getting CancellationException #398

Open sawickil opened 9 years ago

sawickil commented 9 years ago

Hey,

we are currently testing our solution based on ES 1.3.4 and jdbc-river 1.3.4.4. The river is executed every 1 minute. The first hour of tests using the column strategy went OK. After the first hour we started getting problems - got 18 CancellationExceptions for 30 minutes. This is a piece of logs:

[2014-11-27 12:57:59,715][INFO ][river.jdbc.SimpleRiverMouth] stopping bulk mode for index transactions_unclassified and refreshing...

[2014-11-27 12:57:59,727][INFO ][index.shard.service      ] [TRANSACTIONAL_ELASTIC_4] [transactions_unclassified][1] updating refresh_interval from [-1] to [1s]

[2014-11-27 12:57:59,727][INFO ][index.shard.service      ] [TRANSACTIONAL_ELASTIC_4] [transactions_unclassified][2] updating refresh_interval from [-1] to [1s]

[2014-11-27 12:57:59,743][INFO ][river.jdbc.RiverMetrics  ] pipeline org.xbib.elasticsearch.plugin.jdbc.RiverPipeline@2b7bdd49 complete: river jdbc/transaction_delta_river metrics: 19079783 rows, 4044.675412803032 mean, (4199.471495233331 4427.7102218398295 4713.615281284495), ingest metrics: elapsed 1 hour 17 minutes 59 seconds, 9.78 GB bytes, 550.0 bytes avg, 2.14 MB/s

[2014-11-27 12:57:59,745][ERROR][river.jdbc.RiverThread   ] 
java.util.concurrent.CancellationException
    at java.util.concurrent.FutureTask.report(FutureTask.java:121)
    at java.util.concurrent.FutureTask.get(FutureTask.java:188)
    at org.xbib.elasticsearch.plugin.jdbc.pipeline.executor.SimplePipelineExecutor.waitFor(SimplePipelineExecutor.java:125)
    at org.xbib.elasticsearch.plugin.jdbc.pipeline.executor.MetricSimplePipelineExecutor.waitFor(MetricSimplePipelineExecutor.java:61)
    at org.xbib.elasticsearch.plugin.jdbc.RiverThread.run(RiverThread.java:115)
    at java.lang.Thread.run(Thread.java:745)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

[2014-11-27 12:58:00,527][DEBUG][monitor.jvm              ] [TRANSACTIONAL_ELASTIC_4] [gc][young][4794][7096] duration [30ms], collections [1]/[1s], total [30ms]/[4.4m], memory [7gb]->[7.1gb]/[15.9gb], all_pools {[young] [314.8mb]->[444.4mb]/[532.5mb]}{[survivor] [35.8mb]->[31.3mb]/[66.5mb]}{[old] [6.7gb]->[6.7gb]/[15.3gb]}

[2014-11-27 12:58:00,771][ERROR][river.jdbc.BulkNodeClient] after bulk [1375] error
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1301)
    at java.util.concurrent.Semaphore.acquire(Semaphore.java:317)
    at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:293)
    at org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:266)
    at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:248)
    at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:235)
    at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:231)
    at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:217)
    at org.xbib.elasticsearch.plugin.jdbc.client.node.BulkNodeClient.bulkIndex(BulkNodeClient.java:279)
    at org.xbib.elasticsearch.plugin.jdbc.client.node.BulkNodeClient.bulkIndex(BulkNodeClient.java:53)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverMouth.index(SimpleRiverMouth.java:236)
    at org.xbib.elasticsearch.plugin.jdbc.util.RiverMouthKeyValueStreamListener.end(RiverMouthKeyValueStreamListener.java:57)
    at org.xbib.elasticsearch.river.jdbc.strategy.column.ColumnRiverSource$ColumnKeyValueStreamListener.end(ColumnRiverSource.java:204)
    at org.xbib.elasticsearch.river.jdbc.strategy.column.ColumnRiverSource$ColumnKeyValueStreamListener.end(ColumnRiverSource.java:191)
    at org.xbib.elasticsearch.plugin.jdbc.util.PlainKeyValueStreamListener.values(PlainKeyValueStreamListener.java:139)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource.processRow(SimpleRiverSource.java:807)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource.nextRow(SimpleRiverSource.java:760)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource.merge(SimpleRiverSource.java:493)
    at org.xbib.elasticsearch.river.jdbc.strategy.column.ColumnRiverSource.fetch(ColumnRiverSource.java:133)
    at org.xbib.elasticsearch.river.jdbc.strategy.column.ColumnRiverSource.fetch(ColumnRiverSource.java:70)
    at org.xbib.elasticsearch.river.jdbc.strategy.column.ColumnRiverFlow.fetch(ColumnRiverFlow.java:84)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverFlow.execute(SimpleRiverFlow.java:139)
    at org.xbib.elasticsearch.plugin.jdbc.RiverPipeline.request(RiverPipeline.java:88)
    at org.xbib.elasticsearch.plugin.jdbc.RiverPipeline.call(RiverPipeline.java:66)
    at org.xbib.elasticsearch.plugin.jdbc.RiverPipeline.call(RiverPipeline.java:30)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

[2014-11-27 12:58:00,775][ERROR][river.jdbc.SimpleRiverFlow] client is closed
org.elasticsearch.ElasticsearchIllegalStateException: client is closed
    at org.xbib.elasticsearch.plugin.jdbc.client.node.BulkNodeClient.flushIngest(BulkNodeClient.java:347)
    at org.xbib.elasticsearch.plugin.jdbc.client.node.BulkNodeClient.flushIngest(BulkNodeClient.java:53)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverMouth.flush(SimpleRiverMouth.java:284)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverMouth.afterFetch(SimpleRiverMouth.java:120)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverFlow.afterFetch(SimpleRiverFlow.java:247)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverFlow.execute(SimpleRiverFlow.java:141)
    at org.xbib.elasticsearch.plugin.jdbc.RiverPipeline.request(RiverPipeline.java:88)
    at org.xbib.elasticsearch.plugin.jdbc.RiverPipeline.call(RiverPipeline.java:66)
    at org.xbib.elasticsearch.plugin.jdbc.RiverPipeline.call(RiverPipeline.java:30)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

[2014-11-27 12:58:00,799][ERROR][river.jdbc.RiverPipeline ] org.elasticsearch.ElasticsearchIllegalStateException: client is closed
java.io.IOException: org.elasticsearch.ElasticsearchIllegalStateException: client is closed
    at org.xbib.elasticsearch.river.jdbc.strategy.column.ColumnRiverSource.fetch(ColumnRiverSource.java:135)
    at org.xbib.elasticsearch.river.jdbc.strategy.column.ColumnRiverSource.fetch(ColumnRiverSource.java:70)
    at org.xbib.elasticsearch.river.jdbc.strategy.column.ColumnRiverFlow.fetch(ColumnRiverFlow.java:84)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverFlow.execute(SimpleRiverFlow.java:139)
    at org.xbib.elasticsearch.plugin.jdbc.RiverPipeline.request(RiverPipeline.java:88)
    at org.xbib.elasticsearch.plugin.jdbc.RiverPipeline.call(RiverPipeline.java:66)
    at org.xbib.elasticsearch.plugin.jdbc.RiverPipeline.call(RiverPipeline.java:30)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.ElasticsearchIllegalStateException: client is closed
    at org.xbib.elasticsearch.plugin.jdbc.client.node.BulkNodeClient.bulkIndex(BulkNodeClient.java:269)
    at org.xbib.elasticsearch.plugin.jdbc.client.node.BulkNodeClient.bulkIndex(BulkNodeClient.java:53)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverMouth.index(SimpleRiverMouth.java:236)
    at org.xbib.elasticsearch.plugin.jdbc.util.RiverMouthKeyValueStreamListener.end(RiverMouthKeyValueStreamListener.java:57)
    at org.xbib.elasticsearch.river.jdbc.strategy.column.ColumnRiverSource$ColumnKeyValueStreamListener.end(ColumnRiverSource.java:204)
    at org.xbib.elasticsearch.river.jdbc.strategy.column.ColumnRiverSource$ColumnKeyValueStreamListener.end(ColumnRiverSource.java:191)
    at org.xbib.elasticsearch.plugin.jdbc.util.PlainKeyValueStreamListener.values(PlainKeyValueStreamListener.java:139)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource.processRow(SimpleRiverSource.java:807)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource.nextRow(SimpleRiverSource.java:760)
    at org.xbib.elasticsearch.river.jdbc.strategy.simple.SimpleRiverSource.merge(SimpleRiverSource.java:493)
    at org.xbib.elasticsearch.river.jdbc.strategy.column.ColumnRiverSource.fetch(ColumnRiverSource.java:133)
    ... 10 more

[2014-11-27 12:58:00,801][INFO ][river.jdbc.RiverMetrics  ] pipeline org.xbib.elasticsearch.plugin.jdbc.RiverPipeline@4a9bfc57 complete: river jdbc/transaction_delta_river metrics: 19083436 rows, 4044.541917836354 mean, (4199.471495233331 4427.7102218398295 4713.615281284495), ingest metrics: elapsed 1 hour 15 minutes, 8.70 GB bytes, 489.0 bytes avg, 1.98 MB/s

[2014-11-27 12:58:00,803][INFO ][river.jdbc.RiverThread   ] scheduled metrics thread at 60 seconds

[2014-11-27 12:58:00,803][INFO ][river.jdbc.RiverThread   ] scheduled suspend check thread at 1 seconds

[2014-11-27 12:58:00,804][INFO ][river.jdbc.RiverThread   ] scheduled metrics thread at 60 seconds

[2014-11-27 12:58:00,804][INFO ][river.jdbc.RiverThread   ] scheduled suspend check thread at 1 seconds

[2014-11-27 12:58:00,894][INFO ][index.shard.service      ] [TRANSACTIONAL_ELASTIC_4] [transactions_unclassified][1] updating refresh_interval from [1s] to [-1]

[2014-11-27 12:58:00,895][INFO ][index.shard.service      ] [TRANSACTIONAL_ELASTIC_4] [transactions_unclassified][2] updating refresh_interval from [1s] to [-1]

[2014-11-27 12:58:00,898][INFO ][river.jdbc.ColumnRiverSource] fetch state={"name":"transaction_delta_river","type":"jdbc","started":"2014-11-27T10:40:00.037Z","last_active_begin":"2014-11-27T11:58:00.809Z","last_active_end":"2014-11-27T11:58:00.793Z","map":{"aborted":false,"counter":6,"last_run_time":"2014-11-27T10:40:00.540Z","suspended":false}}

[2014-11-27 12:58:00,908][INFO ][river.jdbc.ColumnRiverSource] fetch state={"name":"transaction_delta_river","type":"jdbc","started":"2014-11-27T10:40:00.037Z","last_active_begin":"2014-11-27T11:58:00.809Z","last_active_end":"2014-11-27T11:58:00.793Z","map":{"aborted":false,"counter":6,"last_run_time":"2014-11-27T10:40:00.540Z","suspended":false}}

However, during this period the metrics seemed to change like these (on the other hand, the # of docs in index was not rather increasing):

[2014-11-27 13:27:32,436][INFO ][river.jdbc.RiverMetrics  ] pipeline org.xbib.elasticsearch.plugin.jdbc.RiverPipeline@91a8f4d complete: river jdbc/transaction_delta_river metrics: 21023062 rows, 3239.324064042488 mean, (4199.471495233331 4427.7102218398295 4713.615281284495), ingest metrics: elapsed 3 minutes 39 seconds, 24.62 MB bytes, 1.0 bytes avg, 0.112 MB/s

[2014-11-27 13:27:30,253][INFO ][river.jdbc.RiverThread   ] scheduled metrics thread at 60 seconds

[2014-11-27 13:27:30,253][INFO ][river.jdbc.RiverThread   ] scheduled metrics thread at 60 seconds

[2014-11-27 13:27:30,252][INFO ][river.jdbc.RiverMetrics  ] pipeline org.xbib.elasticsearch.plugin.jdbc.RiverPipeline@2eb60ce6 complete: river jdbc/transaction_delta_river metrics: 21019558 rows, 3239.8745445649693 mean, (4199.471495233331 4427.7102218398295 4713.615281284495), ingest metrics: elapsed 3 minutes 37 seconds, 24.62 MB bytes, 1.0 bytes avg, 0.113 MB/s

[2014-11-27 13:27:29,509][INFO ][river.jdbc.RiverMetrics  ] pipeline org.xbib.elasticsearch.plugin.jdbc.RiverPipeline@2eb60ce6 is running: river jdbc/transaction_delta_river metrics: 21016971 rows, 3239.846728246118 mean, (4199.471495233331 4427.7102218398295 4713.615281284495), ingest metrics: elapsed 3 minutes 37 seconds, 18.73 MB bytes, 0.0 bytes avg, 0.086 MB/s

Some questions from me here:

  1. What are general possible scenarios in which this may happen?
  2. Not sure if it's correlated, but the CancellationException issues appeared when we increased the total load on our DB, so the river queries were taking about 1 minute (less or more) and keep in mind that the scheduler was also set to 1 minute. May those two things connected with each other?

Thanks, Lukasz

jprante commented 9 years ago

Is it possible you close the river while being active?

You can ignore CancellationException on the metrics thread, it only means the thread is terminating early.

The InterruptedException on the bulk action is more serious, since some data may not get indexed.

sawickil commented 9 years ago

Yeah, I also thought that someone was trying to stop/close the river, but I believe nobody touched it. We will perform further tests.

jprante commented 9 years ago

It could also be the cluster moved the river instance to a new node. This is heavily under-tested and service interruption is expected...

sawickil commented 9 years ago

Interesting note. And is it possible the cluster moved the river instance without any reason?

sawickil commented 9 years ago

FYI, we are able to replicate this issue on our development environment with ONE-node ES instance. Generally, if a next river run executes and the previous one has not finished yet the exceptions I described above show up. This happens when used a thread pool size > 1. If it's set to 1, river executions wait in queue and nothing happens. What's the difference?