confluentinc / kafka-connect-elasticsearch

Kafka Connect Elasticsearch connector
Other
15 stars 436 forks source link

fail to sync data when one elasticsearch node crashed in cluster #420

Open azsane opened 4 years ago

azsane commented 4 years ago

We used kafka-connect to sync data from kafka to elasticsearch cluster

version used : 5.4.1

In test enviroment,we setup es cluster with 3 nodes suddenly a node was crashed,but there were 2 nodes left and cluster was still able to serve. But connect failed to sync data

Dig deep with log,we found it was quite complex

LOG 1.ERROR Failed to execute batch xxx of xx records after total of 6 attempts io.searchbox.client.config.exception.CouldNotConnectException caused by Connection refused .... many lines omitted

2.ERROR Can't convert record from topic/partition/offset xxx/xx/xxxx.Error message:io.searchbox.client.config.exception.CouldNotConnectException: Could not connect to http://es.xxxxx (io.confluent.connect.elasticsearch.ElasticsearchWriter)

ElasticsearchWriter uses jest to connect es , and jest starts nodechecker to periodly check es node availablity.

When node is unavailable,it is removed.Before the problematic node is removed, connect will retry failed request which can round robin to the crashed node.

Unfortunatelly,from log,we see that even with round robin,there can be request failed all 6 attemps without lucky.And worse, Adding new record to bulk failed because BulkProcessor.failAndStop was triggered.Record will fail in convertRecord and have no chance to submit.

After the crashed node recovered,connect still can't sync data Is threre any configuration or solution for this situation?

Raphallal commented 4 years ago

I'm experiencing the same issue.

One of the Elastic nodes (one out of three) has been shut down for maintainance purpose, and the Connector was not able to index the files anymore.

I had to manually change the configuration of the connector to remove the node that was shut down.

Stack trace : org.apache.kafka.connect.errors.ConnectException: java.net.NoRouteToHostException: No route to host (Host unreachable) at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.indexExists(JestElasticsearchClient.java:224) at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:230) at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:330) at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:157) at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612) at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69) at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.net.NoRouteToHostException: No route to host (Host unreachable) at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399) at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242) at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224) at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403) at java.base/java.net.Socket.connect(Socket.java:591) at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75) at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374) at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at io.searchbox.client.http.JestHttpClient.executeRequest(JestHttpClient.java:133) at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:70) at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:63) at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.indexExists(JestElasticsearchClient.java:221) ... 25 more

Is there any way to make the connector resilient to node loss ?

cbowdon commented 4 years ago

We experienced a similar issue. A fix or workaround for this would be very useful.

vikasillumina commented 3 years ago

@azsane @Raphallal @cbowdon we experienced somewhat similar issue as I reported here: https://github.com/confluentinc/kafka-connect-elasticsearch/issues/571 I couldn't find as much minute details you were able to find but I bet its the same issue. Sorry we had about 12M log entries to find instance for 96 errored files. What you guys ended up doing? I have made changes to config to have :

 "max.retries"                                   : "20",
  "retry.backoff.ms"                              : "2000",

To see if this will help for our next load test. But I still feel with the details I learnt here, there is still a possibility Kafka connect will not index in elastic but will commit the consumer offsets.

For us this is kind of a deal breaker if we can't guarantee delivery. We are forced to evaluate other option here: https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/ Feels like its a shame to abandon kafka connect which served us so well until this point and still does.

Would love to hear thoughts on this. Regards, Vikas

cbowdon commented 3 years ago

@vikasillumina We haven't experienced it again since upgrading to the latest Kafka Connect, hope that helps you.

vikasillumina commented 3 years ago

Thanks @cbowdon for your response. Which version of Kafka-connect you are using? We are using 6.1.0. But I will look into upgrading to latest version.

cbowdon commented 3 years ago

@vikasillumina We're running 6.2.0

vikasillumina commented 3 years ago

Thanks @cbowdon for the info, I have upgrade our version too. Also upgrade the confluentinc/kafka-connect-elasticsearch package too from 11.0.3 to 11.0.6. We are running kafka connect as kubernetes pod. I noticed the healthcheck library we are using https://github.com/devshawn/kafka-connect-healthcheck, not sure if its serving it purpose. I am seeing the pod restarting a lot. Was curious what you are using. Thanks for your help on this.

Raphallal commented 3 years ago

Hi @vikasillumina, I'm not working on the project anymore so I can't give you infos regarding the version. Nonetheless, I remember we ended up putting a VIP instead of directly the elastic node in the connector config. In this way, we are always "pointing" to a valid elastic node.

vikasillumina commented 3 years ago

Thanks @Raphallal for the info. We end up putting the Elastic search cluster in VPC and access it via VPC endpoint and that seems to have stabilized the connectivity with elastic, no more gateway errors. @cbowdon what version of confluentinc/kafka-connect-elasticsearch you are using? I had unexpected issues with 11.0.6 when I applied lot of load. One of them was Consumer group for kafka connect had no active group members. I had to kill the pods to restart the process.

cbowdon commented 3 years ago

@vikasillumina we're using kafka-connect-elasticsearch 11.1.0. HTH