confluentinc / kafka-connect-elasticsearch

Kafka Connect Elasticsearch connector
Other
15 stars 436 forks source link

Error: Cannot infer mapping without schema #728

Closed jliunyu closed 1 year ago

jliunyu commented 1 year ago

I'm currently working on sink connector to Elastic Search, I config kafka and ES on my local, currently the connector is able to work, can create Index according to the topic from Kafka. But when trying to write data to ES, I keep meeting this error:

 [2023-11-16 19:07:56,566] INFO [simple-elasticsearch-connector|task-1] Creating index simple.elasticsearch.data-new11. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:227)
[2023-11-16 19:07:56,612] ERROR [simple-elasticsearch-connector|task-1] WorkerSinkTask{id=simple-elasticsearch-connector-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot infer mapping without schema. (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.
    at io.confluent.connect.elasticsearch.Mapping.buildMapping(Mapping.java:82)
    at io.confluent.connect.elasticsearch.Mapping.buildMapping(Mapping.java:69)
    at io.confluent.connect.elasticsearch.ElasticsearchClient.createMapping(ElasticsearchClient.java:274)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.checkMapping(ElasticsearchSinkTask.java:145)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.tryWriteRecord(ElasticsearchSinkTask.java:255)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:112)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
[2023-11-16 19:07:56,614] ERROR [simple-elasticsearch-connector|task-1] WorkerSinkTask{id=simple-elasticsearch-connector-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:196)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.
    at io.confluent.connect.elasticsearch.Mapping.buildMapping(Mapping.java:82)
    at io.confluent.connect.elasticsearch.Mapping.buildMapping(Mapping.java:69)
    at io.confluent.connect.elasticsearch.ElasticsearchClient.createMapping(ElasticsearchClient.java:274)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.checkMapping(ElasticsearchSinkTask.java:145)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.tryWriteRecord(ElasticsearchSinkTask.java:255)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:112)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    ... 10 more

This is my evn_properties.yaml

worker:
  dev:
    bootstrap.servers: host.docker.internal:9092
    config.storage.replication.factor: 1
    num.partitions: 1
    config.storage.topic: kcaas-es-sink-dev-compact-config-new11
    offset.storage.topic: kcaas-es-sink-dev-compact-offset-new11
    status.storage.topic: kcaas-es-sink-dev-compact-status-new11
    group.id: streaming-kafka-to-es-sink-connector-sample-dev-new11

simple-elasticsearch-connector:
  dev:
    connection.url: host.docker.internal:9200
    tasks.max: 2
    type.name: _doc,
    topics: simple.elasticsearch.data-new11
    project: wmt-49ef6ef21a2c7199935f24250e
This is my kc_config.yaml
worker:
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false
connectors:
  - name: simple-elasticsearch-connector
    config:
      connector.class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
      schema.ignore: true

Can I get some suggestions on the configuration part and why I met the above error?

jliunyu commented 1 year ago

Close this one as it's already resolved