confluentinc / kafka-connect-elasticsearch

Kafka Connect Elasticsearch connector
Other
15 stars 435 forks source link

Connector is not auto retrying on Indexing Error #634

Open spencerdcarlson opened 2 years ago

spencerdcarlson commented 2 years ago

When attempting to write to ElasticSearch and an "Indexing record failed" error occurs, the connector dies without any retries. We do not even see the retry log execute.

Kafka Connect Image: confluentinc/cp-kafka-connect:7.1.1 Elastic Search Connector: confluentinc/kafka-connect-elasticsearch:13.0.0 Elastic Search Version: 7.8.0

Connector Config

{
      "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "connection.url" : "<REDACTED>",
      "connection.username" : "<REDACTED>",
      "connection.password" : "<REDACTED>",
      "tasks.max" : "1",
      "consumer.override.bootstrap.servers" : "<REDACTED>",
      "topics" : "confluent-audit-log-events",
      "name" : "erie-confluent-audit-logs",
      "linger.ms" : "5000",
      "connection.timeout.ms" : "15000",
      "read.timeout.ms" : "15000",
      "flush.synchronously" : "true",
      "max.retries" : "10",
      "retry.backoff.ms" : "1000",
      "consumer.override.sasl.jaas.config" : "<REDACTED>",
      "producer.override.sasl.jaas.config" : "<REDACTED>",
      "type.name" : "_doc",
      "behavior.on.malformed.documents" : "WARN",
      "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable" : "false",
      "schema.ignore" : "true",
      "key.ignore" : "true",
      "transforms" : "topicFromJsonPath, topicReformat, topicDateSuffix",
      "transforms.topicFromJsonPath.type" : "io.confluent.connect.transforms.ExtractTopic$Value",
      "transforms.topicFromJsonPath.field" : "$[\"type\"]",
      "transforms.topicFromJsonPath.field.format" : "JSON_PATH",
      "transforms.topicFromJsonPath.skip.missing.or.null" : "false",
      "transforms.topicReformat.type" : "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.topicReformat.regex" : "([^\\/]+)\\/([a-z]+)",
      "transforms.topicReformat.replacement" : "confluent-audit-logs-$1-$2"
      "transforms.topicDateSuffix.type" : "org.apache.kafka.connect.transforms.TimestampRouter",
      "transforms.topicDateSuffix.timestamp.format" : "yyyy-MM-dd",
      "transforms.topicDateSuffix.topic.format" : "$${topic}-$${timestamp}",
      "errors.tolerance" : "all",
      "errors.log.enable" : true,
      "errors.log.include.messages" : true,
      "errors.deadletterqueue.topic.name" : "kafka-connect-cluster.erie.usw2.prod.dlq.erie-confluent-audit-logs.v1",
      "errors.deadletterqueue.context.headers.enable" : "true",
    }

Stacktrace

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:173)
at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:402)
at io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:383)
at org.elasticsearch.action.ActionListener$RunAfterActionListener.onResponse(ActionListener.java:341)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:119)
at java.base/java.lang.Thread.run(Thread.java:829)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at org.elasticsearch.action.bulk.Retry$RetryHandler.onResponse(Retry.java:71)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2188)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:169)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.elasticsearch.action.bulk.Retry$RetryHandler.finishHim(Retry.java:168)
at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:128)
at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:184)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
Caused by: ElasticsearchException[Elasticsearch exception [type=es_rejected_execution_exception, reason=rejected execution of processing of [443935630][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[confluent-audit-logs-io.confluent.kafka.server-authorization-2022-05-25][0]] containing [27] requests, target allocation id: NJf_aiuQROqIBvKtXQr8Gg, primary term: 1 on EsThreadPoolExecutor[name = prod-usw2-cs-main-es-logging-esdata-10/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@5782c33f[Running, pool size = 8, active threads = 8, queued tasks = 246, completed tasks = 202515737]]]]
at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:491)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:158)
Caused by: org.apache.kafka.connect.errors.ConnectException: Indexing record failed.
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:620)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
... 5 more
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$0(ElasticsearchClient.java:171)
at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:426)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at io.confluent.connect.elasticsearch.ElasticsearchClient.handleResponse(ElasticsearchClient.java:565)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:59)
at org.elasticsearch.action.bulk.Retry$RetryHandler.onResponse(Retry.java:112)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:56)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:2105)
at io.confluent.connect.elasticsearch.ElasticsearchClient.handleResponse(ElasticsearchClient.java:565)
at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:620)
at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:426)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:56)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$0(ElasticsearchClient.java:171)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:59)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:128)
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:2105)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
org.apache.kafka.connect.errors.ConnectException: Indexing record failed.
at org.elasticsearch.action.bulk.Retry$RetryHandler.onResponse(Retry.java:112)
... 5 more
at org.elasticsearch.action.ActionListener$RunAfterActionListener.onResponse(ActionListener.java:341)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:173)
at java.base/java.lang.Thread.run(Thread.java:829)
clescot commented 5 months ago

I've encountered a similar issue ("Indexing record failed") : I've upgraded the docker image to confluentinc/cp-kafka-connect:7.6.1-1-ubi8.amd64 , and the connector from 14.0.12 to 14.0.17, and the issue disappeared. hope it helps.