DarioBalinzo / kafka-connect-elasticsearch-source

Kafka Connect Elasticsearch Source
Apache License 2.0
65 stars 38 forks source link

ERROR [elastic-source|task-0] error (com.github.dariobalinzo.task.ElasticSourceTask:217) #77

Closed SarindraTherese closed 2 years ago

SarindraTherese commented 2 years ago

Hey Dario! I finally managed to successfully run elasticsearch source connector witch apache kafka.

{
  "name": "elastic-source",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.1.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.1.1:8083"
    }
  ],
  "type": "source"
}

However, when I run the elasticsearch-source.properties as distributed:

name=elastic-source
connector.class=com.github.dariobalinzo.ElasticSourceConnector
tasks.max=1
es.host=127.0.0.1
es.port=1750
index.prefix=products
topic.prefix=es_
topic=elastic-events

I have some error like this:

[2022-09-20 17:45:38,127] INFO [elastic-source|task-0] fetching from products (com.github.dariobalinzo.task.ElasticSourceTask:201)
[2022-09-20 17:45:38,128] INFO [elastic-source|task-0] found last value Cursor{primaryCursor='null', secondaryCursor='null'} (com.github.dariobalinzo.task.ElasticSourceTask:203)
[2022-09-20 17:45:38,129] WARN [elastic-source|task-0] request [POST http://localhost:1750/products/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512] returned 1 warnings: [299 Elasticsearch-7.15.0-79d65f6e357953a5b3cbcc5e2c7c21073d89aa29 "Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.15/security-minimal-setup.html to enable security."] (org.elasticsearch.client.RestClient:72)
[2022-09-20 17:45:38,129] ERROR [elastic-source|task-0] error (com.github.dariobalinzo.task.ElasticSourceTask:217)
ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]
    at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:178)
    at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)
    at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2461)
    at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2184)
    at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
    at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
    at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367)
    at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:176)
    at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:90)
    at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:305)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
    Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://localhost:1750], URI [/products/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 400 Bad Request]

and this:

{"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [@timestamp] in order to sort on","index_uuid":"b575v16yTXmq5o2sk77zbA","index":"products"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"can_match","grouped":true,"failed_shards":[{"shard":0,"index":"products","node":"asmTRFlgThS7kBU6yyCJzA","reason":{"type":"query_shard_exception","reason":"No mapping found for [@timestamp] in order to sort on","index_uuid":"b575v16yTXmq5o2sk77zbA","index":"products"}}]},"status":400}
        at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
        at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2699)
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
        ... 15 more

All list topics are:

__consumer_offsets
connect-configs
connect-offset
connect-offsets
connect-status
elastic-events

Normally in your tutorial all the indices products* are sent to Kafka using the es_ string as a topic prefix. So my question am I doing things right here? what's wrong? how can i know if all the data i pass to elasticsearch is read by kafka? thanks

consulthys commented 2 years ago

Duplicate of https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source/issues/70

SarindraTherese commented 2 years ago

I was able to solve the problem, step by step 1- create index

PUT /test-index
{
  "settings": {
    "number_of_replicas": 2,
    "number_of_shards": 2
  }
}

2- Add field timestamp

PUT _ingest/pipeline/add-current-time
{
  "description" : "automatically add the current time to the documents",
  "processors" : [
    {
      "set" : {
      PUT _ingest/pipeline/add-current-time
{
  "description" : "automatically add the current time to the documents",
  "processors" : [
    {
      "set" : {
        "field": "timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}  "field": "timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

3- Create data

PUT test-index/_doc/11?pipeline=add-current-time
{
   "my_field": "test numero 11",
   "girls group": "spice girl"
}

4- Output in kibana

"hits" : [
      {
        "_index" : "test-index",
        "_type" : "_doc",
        "_id" : "9",
        "_score" : 1.0,
        "_source" : {
          "my_field" : "test numero 9",
          "timestamp" : "2022-09-21T14:34:05.785318623Z"
        }
      }]
DarioBalinzo commented 2 years ago

Thanks for the explanation!

I think the schema of the index must be created before that the connector is running. Otherwise some fields does not exist yet and elastic cannot sort on an unknown field.