DarioBalinzo / kafka-connect-elasticsearch-source

Kafka Connect Elasticsearch Source
Apache License 2.0
65 stars 38 forks source link

No mapping found for [timestamp] in order to sort on #70

Open dahal4 opened 2 years ago

dahal4 commented 2 years ago

Hi there ,I am facing with no mapping found error while using this connector. I have checked on kibana by sorting the hits based on time stamp ,where i got positive result . In this case i dont know where i have done mistake ... So i am in need of help .Any solution or knowledge is welcome.

Passed config:

{
    "name": "elastic_source2",
    "config": {
        "connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
        "tasks.max": "1",
        "es.host": "03dc4cd17baf46e190268130c6cb7d1d.us-central1.gcp.cloud.es.io",
        "es.scheme": "https",
        "es.port": "9243",
        "es.user": "elastic",
        "es.password": "uqBE5g4ZIJA9Tp****2leGPq",
        "index.name": "kibana_sample_data_flights",
        "topic.prefix": "es_",
        "incrementing.field.name": "timestamp"
    }
}

Error logs:


[2022-05-10 10:25:45,189] INFO [elastic_source2|task-0] Stopping task elastic_source2-0 (org.apache.kafka.connect.runtime.Worker:829)
[2022-05-10 10:25:45,723] ERROR [elastic_source2|task-0] error (com.github.dariobalinzo.task.ElasticSourceTask:217)
ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]
    at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:178)
    at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)
    at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2461)
    at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2184)
    at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
    at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
    at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367)
    at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:176)
    at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:90)
    at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:291)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [https://03dc4cd17baf46e190268130c6cb7d1d.us-central1.gcp.cloud.es.io:9243], URI [/.ds-logs-enterprise_search.api-default-2022.05.09-000001/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 400 Bad Request]
{"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [timestamp] in order to sort on","index_uuid":"Wx0FJ-RFQFOTqp_wJykMfA","index":".ds-logs-enterprise_search.api-default-2022.05.09-000001"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":".ds-logs-enterprise_search.api-default-2022.05.09-000001","node":"-Vr1NH-xQ-miv45oOjwkXw","reason":{"type":"query_shard_exception","reason":"No mapping found for [timestamp] in order to sort on","index_uuid":"Wx0FJ-RFQFOTqp_wJykMfA","index":".ds-logs-enterprise_search.api-default-2022.05.09-000001"}}]},"status":400}
        at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
        at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2699)
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
        ... 15 more
[2022-05-10 10:25:45,725] INFO [elastic_source2|task-0] WorkerSourceTask{id=elastic_source2-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-05-10 10:25:45,728] INFO [elastic_source2|task-0] [Producer clientId=connector-producer-elastic_source2-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1208)
jag959 commented 2 years ago

I am running into similar issue (not sure if user error on my side.)

Config:

{ "name": "ElasticSource_RPA", "config": { "name": "ElasticSource_RPA", "connector.class": "com.github.dariobalinzo.ElasticSourceConnector", "errors.log.enable": "true", "errors.log.include.messages": "true", "es.host": "REDACTED", "es.port": "9200", "index.names": "finance-invoices", "incrementing.field.name": "datetime", "poll.interval.ms": "60000", "topic.prefix": "rpa" } }

Error:

[2022-09-14 19:20:10,793] ERROR error (com.github.dariobalinzo.task.ElasticSourceTask) ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]] at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:178) at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484) at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2461) at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2184) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105) at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367) at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:176) at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:90) at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:205) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:307) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:263) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://REDACTED:9200], URI [/finance-invoices/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 400 Bad Request] {"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [] in order to sort on","index_uuid":"6Z3jeKV4Q5WUJ6p3zEgdXw","index":"finance-invoices"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":"finance-invoices","node":"Wyczj20LRY6-6O8Vgi4qww","reason":{"type":"query_shard_exception","reason":"No mapping found for [] in order to sort on","index_uuid":"6Z3jeKV4Q5WUJ6p3zEgdXw","index":"finance-invoices"}}]},"status":400} at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:301) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:276) at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2699) at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171) ... 15 more

dahal4 commented 2 years ago

Hi there i don't know why you are using this for but in my case i used it to migrate from old ES to new ES ....But i was unaware of elastic search feature of re-indexing which helped in migrating from one es to another easily...Can you tell me why you need this connector?

jag959 commented 2 years ago

I am trying to read data from ES as part of an integration allowing non-ES users to ingest data from ES using Kafka. I actually got it working now on my side this is is no longer an open issue for me.

dahal4 commented 2 years ago

can you share how you managed to solve this issue

DarioBalinzo commented 2 years ago

I think that probably you need an incrementing field which support sorting, so you need to configure it if this is not the case (e.g. you need that field to be indexed)

jag959 commented 2 years ago

Yes, I added a timestamp field to the index that is automatically updated with each new indexed document using a pipeline in ES. My config for this connector which is working for me appears below...

{

"name": "ElasticSource_RPA", "config": { "name": "ElasticSource_RPA", "connector.class": "com.github.dariobalinzo.ElasticSourceConnector", "tasks.max": "1", "errors.log.enable": "true", "errors.log.include.messages": "true", "es.host": "REDACTED_IPADDRESS", "es.scheme": "http", "es.port": "9200", "index.names": "finance-invoices", "incrementing.field.name": "timestamp", "poll.interval.ms": "60000", "topic.prefix": "rpa" } }

DarioBalinzo commented 2 years ago

As discussed in #77 try first to create the index with the schema mapping and some date. Then after start the connector. In the future I will try to implement a better error handling to silently ignore the missing index schema mapping until data is not yet ready. Right now however this workaround should fix the issue.

dahal4 commented 2 years ago

i think this connector is only valid for timestamp data ...what if i don't have timestamp in my data ...which we can find commonly in old types to data .Is there any way for those type of data ???