Aiven-Open / opensearch-connector-for-apache-kafka

Aiven's OpenSearch® Connector for Apache Kafka®
Apache License 2.0
63 stars 34 forks source link

Sink dying and not recovering #227

Closed elnoxgdl closed 10 months ago

elnoxgdl commented 11 months ago

Hi Team,

I am getting this exception from time to time and the sink is not recovering after it. If it happens I need to restart the sink connector.

[2023-08-08 20:16:20,793] INFO [Worker clientId=connect-1, groupId=connect-cluster] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2122)
[2023-08-08 20:26:20,550] INFO [AdminClient clientId=adminclient-10] Node 0 disconnected. (org.apache.kafka.clients.NetworkClient:937)
[2023-08-08 20:26:23,166] ERROR [ecom-topics|task-0] Failed to send bulk request from batch 53179 of 1 records (io.aiven.kafka.connect.opensearch.BulkProcessor:419)
org.apache.http.ConnectionClosedException: Connection is closed
    at org.opensearch.client.RestClient.extractAndWrapCause(RestClient.java:941)
    at org.opensearch.client.RestClient.performRequest(RestClient.java:332)
    at org.opensearch.client.RestClient.performRequest(RestClient.java:320)
    at org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1911)
    at org.opensearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1877)
    at org.opensearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1845)
    at org.opensearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:364)
    at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.lambda$execute$0(BulkProcessor.java:389)
    at io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry(RetryUtil.java:119)
    at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.execute(BulkProcessor.java:386)
    at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.call(BulkProcessor.java:370)
    at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.call(BulkProcessor.java:351)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:356)
    at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    ... 1 more
[2023-08-08 20:26:23,167] ERROR [ecom-topics|task-0] Failed to bulk processing after total of 1 attempt(s) (io.aiven.kafka.connect.opensearch.RetryUtil:136)
org.apache.kafka.connect.errors.ConnectException: org.apache.http.ConnectionClosedException: Connection is closed
    at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.lambda$execute$0(BulkProcessor.java:421)
    at io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry(RetryUtil.java:119)
    at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.execute(BulkProcessor.java:386)
    at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.call(BulkProcessor.java:370)
    at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.call(BulkProcessor.java:351)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed

This is the configuration for the sink

{
        "batch.size": "100",
        "behavior.on.null.values": "IGNORE",
        "behavior.on.malformed.documents": "IGNORE",
        "connection.compression": "true",
        "connection.password": "<READACTED>",
        "connection.url": "<READACTED>",
        "connection.username": "<READACTED>",
        "connection.timeout.ms": "30000",
        "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
        "flush.timeout.ms": "60000",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.ignore": "true",
        "max.retries": "5",
        "name": "ecom-topics",
        "read.timeout.ms": "30000",
        "retry.backoff.ms": "60000",
        "schema.ignore": "true",
        "topics.regex": "^ecom.*",
        "transforms.ConvertTimestamp.field": "@timestamp",
        "transforms.ConvertTimestamp.format": "yyyy-MM-dd'T'HH:mm:ss'Z'",
        "transforms.ConvertTimestamp.target.type": "string",
        "transforms.ConvertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.InsertField.offset.field": "offset",
        "transforms.InsertField.partition.field": "partition",
        "transforms.InsertField.static.field": "env",
        "transforms.InsertField.static.value": "production",
        "transforms.InsertField.topic.field": "topic",
        "transforms.InsertField.topic.value": "${topic}",
        "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertTimestamp.timestamp.field": "@timestamp",
        "transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.TimestampRouter.timestamp.format": "YYYY.MM",
        "transforms.TimestampRouter.topic.format": "kafka-messages-prod.${timestamp}",
        "transforms.TimestampRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",
        "transforms": "InsertField,InsertTimestamp,ConvertTimestamp,TimestampRouter",
        "value.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "errors.deadletterqueue.context.headers.enable": "false",
            "errors.log.enable": "true",
                "errors.tolerance": "all",
            "errors.log.include.messages": "true",
            "errors.retry.timeout": "-1",
                "tasks.max": "1",
                "log.sensitive.data": "true",
                "max.in.flight.requests": "5",
                "max.buffered.records": "10000",
                "config.action.reload": "restart",
                "max.connection.idle.time.ms": "10000",
                "flush.synchronously": "true",
                "write.method": "INSERT"
}
willyborankin commented 11 months ago

Hi @elnoxgdl thank you for the feedback. I think it is related to the https://github.com/Aiven-Open/opensearch-connector-for-apache-kafka/issues/179 problem which was fixed recently. Im preparing release

elnoxgdl commented 10 months ago

@willyborankin yes! It fixed it!! Wohooo! Thank you!