DarioBalinzo / kafka-connect-elasticsearch-source

Kafka Connect Elasticsearch Source
Apache License 2.0
65 stars 38 forks source link

fixed blacklist filter and task configs updated as the indexes list order changed #71

Closed nacky012001 closed 2 years ago

nacky012001 commented 2 years ago

Upgrade the jackson-databind to 2.13.2.2 since 2.13.0.0 is vulnerable to a Denial of Service [DoS] attack. https://github.com/FasterXML/jackson-databind/issues/3426

Fixed blacklist filtering:

  1. It should return null (but not the value) for list/map if key is in the blacklist
  2. It should filter out (but not visit the nested value) the whole map/list if the key of map is in the blacklist
DarioBalinzo commented 2 years ago

Hi, Thanks you for the PR. I will try to review it during this weekend.

nacky012001 commented 2 years ago

Sorry. I think i should have created another branch for the second commit.

Issue for second commit: The connections are closed due to config updated even the indexes haven't changed: [2022-05-18 15:37:36,726] INFO [Worker clientId=connect-1, groupId=kafka] Handling task config update by restarting tasks [elasticsearch-source-4, elasticsearch-source-3, elasticsearch-source-0, elasticsearch-source-2, elasticsearch-source-1] (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [2022-05-18 15:37:36,726] INFO Stopping task elasticsearch-source-4 (org.apache.kafka.connect.runtime.Worker) [2022-05-18 15:37:36,727] INFO Stopping task elasticsearch-source-3 (org.apache.kafka.connect.runtime.Worker) [2022-05-18 15:37:36,728] INFO Stopping task elasticsearch-source-0 (org.apache.kafka.connect.runtime.Worker) [2022-05-18 15:37:36,730] INFO Stopping task elasticsearch-source-2 (org.apache.kafka.connect.runtime.Worker) [2022-05-18 15:37:36,730] INFO Stopping task elasticsearch-source-1 (org.apache.kafka.connect.runtime.Worker) [2022-05-18 15:37:37,195] INFO found last value Cursor{primaryCursor='2022-05-18T07:33:04.487Z', secondaryCursor='null'} (com.github.dariobalinzo.task.ElasticSourceTask) [2022-05-18 15:37:37,195] INFO found last value Cursor{primaryCursor='2022-05-15T07:59:01.762Z', secondaryCursor='null'} (com.github.dariobalinzo.task.ElasticSourceTask) [2022-05-18 15:37:37,195] ERROR error (com.github.dariobalinzo.task.ElasticSourceTask) java.lang.RuntimeException: Request cannot be executed; I/O reactor status: STOPPED at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:904) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:288) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:276) at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2699) at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105) at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367) at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:176) at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:90) at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:205) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.IllegalStateException: Request cannot be executed; I/O reactor status: STOPPED at org.apache.http.util.Asserts.check(Asserts.java:46) at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.ensureRunning(CloseableHttpAsyncClientBase.java:90) at org.apache.http.impl.nio.client.InternalHttpAsyncClient.execute(InternalHttpAsyncClient.java:123) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:284) ... 18 more

I think it is due to the order of the list of the new and old indices. I updated the condition of comparision between two lists to ignore the order and duplication.

Moreover, the AtomicBoolean variable "stopping" in ElasticSourceTask doesn't work as expected since it cannot lock the poll() up.