DarioBalinzo / kafka-connect-elasticsearch-source

Kafka Connect Elasticsearch Source
Apache License 2.0
65 stars 38 forks source link

source connector elasticsearch failed #75

Closed SarindraTherese closed 2 years ago

SarindraTherese commented 2 years ago

Hey, I have some error in log when i try to use elasticsearch source connector

[2022-08-19 11:53:46,179] ERROR [elastic-source|worker] error in searching index names (com.github.dariobalinzo.elastic.ElasticRepository:193)
[2022-08-19 11:53:46,180] ERROR [elastic-source|worker] Error while trying to get updated topics list, ignoring and waiting for next table poll interval (com.github.dariobalinzo.elastic.ElasticIndexMonitorThread:92)

this link https://localhost:9200/_cat/indices?v return

health status index                        uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   kibana_sample_data_ecommerce tRcxyLrsSKuWcmOx3ZNKew   1   0       4675            0        4mb            4mb
yellow open   index-elastic                CvedHZsxTB6aGeP9cUVedw   1   1          0            0       225b           225b
yellow open   test                         R9t_ToRpQGyi_tTw6MdkSA   1   1         15            0     15.5kb         15.5kb

And this is my properties connector:

{
  "name": "elastic-source",
  "config": {
    "name": "elastic-source",
    "connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
    "tasks.max": "1",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "es.host": "127.0.0.1",
    "es.port": "9200",
    "es.user": "elastic",
    "es.password": "password",
    "index.prefix": "test",
    "topic.prefix": "test"
  }
}

As we see, text index is already exist, so what's wrong with my approach?

DarioBalinzo commented 2 years ago

Hi, thanks for reporting this.

Can you share the full stacktrace below 2022-08-19 11:53:46,180] ERROR [elastic-source|worker] Error while trying to get updated topics list, ignoring and waiting for next table poll interval (com.github.dariobalinzo.elastic.ElasticIndexMonitorThread:92) ?

SarindraTherese commented 2 years ago

Yes!

(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:376)
[2022-08-19 15:18:47,623] ERROR [elastic-source|worker] error in searching index names (com.github.dariobalinzo.elastic.ElasticRepository:193)
[2022-08-19 15:18:47,624] ERROR [elastic-source|worker] Error while trying to get updated topics list, ignoring and waiting for next table poll interval (com.github.dariobalinzo.elastic.ElasticIndexMonitorThread:92)
java.lang.RuntimeException: org.apache.http.ConnectionClosedException: Connection is closed
    at com.github.dariobalinzo.elastic.ElasticRepository.catIndices(ElasticRepository.java:194)
    at com.github.dariobalinzo.elastic.ElasticIndexMonitorThread.updateIndexes(ElasticIndexMonitorThread.java:89)
    at com.github.dariobalinzo.elastic.ElasticIndexMonitorThread.run(ElasticIndexMonitorThread.java:45)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
    at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:886)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:288)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
    at com.github.dariobalinzo.elastic.ElasticRepository.catIndices(ElasticRepository.java:191)
    ... 2 more
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:356)
    at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    at java.base/java.lang.Thread.run(Thread.java:829)
SarindraTherese commented 2 years ago

And this is in above

org.apache.kafka.connect.errors.ConnectException: Cannot find any elasticsearch index
    at com.github.dariobalinzo.elastic.ElasticIndexMonitorThread.indexes(ElasticIndexMonitorThread.java:77)
    at com.github.dariobalinzo.ElasticSourceConnector.findTaskFromIndexPrefix(ElasticSourceConnector.java:131)
    at com.github.dariobalinzo.ElasticSourceConnector.taskConfigs(ElasticSourceConnector.java:121)
    at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:383)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1574)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1521)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$30(DistributedHerder.java:1534)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:408)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:326)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
[2022-08-19 17:59:08,696] INFO SourceConnectorConfig values: 
    config.action.reload = restart
    connector.class = com.github.dariobalinzo.ElasticSourceConnector
    errors.log.enable = true
    errors.log.include.messages = true
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    header.converter = null
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    name = elastic-source
    predicates = []
    tasks.max = 1
    topic.creation.groups = []
    transforms = []
    value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:376)
[2022-08-19 17:59:08,696] INFO EnrichedConnectorConfig values: 
    config.action.reload = restart
    connector.class = com.github.dariobalinzo.ElasticSourceConnector
    errors.log.enable = true
    errors.log.include.messages = true
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    header.converter = null
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    name = elastic-source
    predicates = []
    tasks.max = 1
    topic.creation.groups = []
    transforms = []
    value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:376)
DarioBalinzo commented 2 years ago

Cannot find any elasticsearch index maybe the index was created too late (i.e. the connector was already running)?

can you please try to destroy and recreate it?

Also, ConnectionClosedException: Connection is closed seems to be a network issue

SarindraTherese commented 2 years ago

Maybe the problem is the https of elasticsearch, because I've already try to connect elasticsearch in python but this return error access ssl, so I'm going to fix ssl access about elasticsearch today, therefor if you have some link that helped so i'm interested thanks

SarindraTherese commented 2 years ago

Indeed, it is because of the url https, so I turn false some value in file elasticsearch.yml and the status change to running :

xpack.security.enabled: false
xpack.security.enrollment.enabled: false
xpack.security.http.ssl:
  enabled: false
  keystore.path: certs/http.p12
xpack.security.transport.ssl:
  enabled: false

However, I don't know how to solve this case with the https address Thanks

DarioBalinzo commented 2 years ago

closing since the issue seems not related to the connector