confluentinc / kafka-connect-elasticsearch

Kafka Connect Elasticsearch connector
Other
11 stars 433 forks source link

Not able to sink data from Kafka to ElasticSearch using kafka-connect-elasticsearch #153

Closed naveengauba closed 6 years ago

naveengauba commented 6 years ago

I have installed kafka 0.10.2.1 and kafka-connect 3.2.2 on my Mac machine. A process writes json data as bytes onto a kafka topic. I have configured kafka-connect to read the data from that topic and write the same in an ElasticSearch index.

However kafka-connect is failing to parse the data and gives the following error.

[2017-11-27 14:15:21,503] DEBUG WorkerSinkTask{id=elasticsearch-sink-0} Skipping offset commit, no change since last commit (org.apache.kafka.connect.runtime.WorkerSinkTask:342)
[2017-11-27 14:15:21,503] DEBUG Finished WorkerSinkTask{id=elasticsearch-sink-0} offset commit successfully in 0 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:207)
[2017-11-27 14:15:21,503] ERROR Task elasticsearch-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('d' (code 100)): Expected space separating root-level values
 at [Source: [B@51864299; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('d' (code 100)): Expected space separating root-level values
 at [Source: [B@51864299; line: 1, column: 4]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:447)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:463)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1605)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1346)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:799)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2017-11-27 14:15:21,505] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142).

THis is how the data looks like when I read the same using kafka-console-consumer.

{"id":"65ce9fa3-3efd-4f16-b82d-b6c59a35e44b","name":"vne-win-10.1.195.216","tag":"","description":"","creationTime":1491344104000,"modificationTime":1492115799000,"deviceHost":"10.1.195.216","streamingHost":"10.1.195.216","multicastHost":"10.1.195.216","currentEdgeVersion":"4.0.0","targetEdgeVersion":"4.0.0","httpEnabled":true,"hlsEnabled":true,"rtspEnabled":false,"platform":"Windows (x64)","osVersion":"win7","lastOnlineTime":1494972839000,"alertCount":361,"online":false}

Please advise.

naveengauba commented 6 years ago

Here are the contents for my connect-standalone.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=192.168.99.100:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
6isoID commented 6 years ago

Same issue on 0.11.0.1 and kafka-connect 3.3.0. No updates here?

rmoff commented 6 years ago

Is your message key really JSON? This is what you're specifying with key.converter=org.apache.kafka.connect.json.JsonConverter

Use print.key with kafka-console-consumer to check what your message keys are:

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --property print.key=true --from-beginning --topic foo

or use kafkacat:

kafkacat -C -c1 -K: -b localhost:9092 -f 'Key:    %k\nValue:  %s\n' -t foo

If your key is not a Json then try key.converter=org.apache.kafka.connect.storage.StringConverter instead.

naveengauba commented 6 years ago

@rmoff Changing the message key to key.converter=org.apache.kafka.connect.storage.StringConverter as suggested fixed the issue. The issue can be closed now. Thanks for the help.

p4mp commented 6 years ago

i tried the above steps, it dint work for me , i still get the same error (io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig:223) [2018-08-27 23:45:37,632] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) java.lang.NoClassDefFoundError: io/searchbox/client/JestClientFactory at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:112) at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:53) at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:267) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:163) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: io.searchbox.client.JestClientFactory at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 11 more [2018-08-27 23:45:37,634] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173) [2018-08-27 23:45:37,634] INFO Stopping ElasticsearchSinkTask. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:175) ^C[2018-08-27 23:45:50,488] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65) [2018-08-27 23:45:50,490] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154) [2018-08-27 23:45:50,494] INFO Stopped ServerConnector@7f62bec6{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306) [2018-08-27 23:45:50,507] INFO Stopped o.e.j.s.ServletContextHandler@5db8e6b8{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865) [2018-08-27 23:45:50,510] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:165) [2018-08-27 23:45:50,510] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:76) [2018-08-27 23:45:50,510] INFO Stopping task elasticsearch-sink-0 (org.apache.kafka.connect.runtime.Worker:464) [2018-08-27 23:45:50,511] INFO Stopping connector elasticsearch-sink (org.apache.kafka.connect.runtime.Worker:304) [2018-08-27 23:45:50,511] INFO Stopped connector elasticsearch-sink (org.apache.kafka.connect.runtime.Worker:320) [2018-08-27 23:45:50,511] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:154) [2018-08-27 23:45:50,512] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:67) [2018-08-27 23:45:50,512] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:175) [2018-08-27 23:45:50,513] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:86) [2018-08-27 23:45:50,513] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:70)

faysalmazeddiu commented 5 years ago

I am new in kafka . here I want to load data from kafka topic to Elastic search but it does not work . please help me . bellow is my connector .

writting data to kafka topic :

 KStream<String, String> outputStream = statusStream
                .leftJoin(stationInfoTable, (num_bikes, info) -> {
                    return new BikeStats(Integer.parseInt(num_bikes), info.capacity,
                            info.latitude, info.longitude);
                })
                .filter((k, stats) -> stats.availabilityRatio < 0.1)
                .map((k, stats )->{ 
                    try {
                        AfterStreamProcess result=new AfterStreamProcess();
                        result.stationId=k;
                        result.longitude=stats.longitude;
                        result.latitude=stats.latitude;
                        result.numBikesAvailable=stats.numBikesAvailable;
                        result.capacity=stats.stationCapacity;  
                        result.ratio=String.format("%.2f", stats.availabilityRatio * 100) + "%";
                        //resultList.add(result);
                        String value=mapper.writeValueAsString(result);
                        return new KeyValue<>(k,value);
                    } catch (Exception e) {
                        throw new RuntimeException("Output error" + e);
                    }
                }); 
        // output to kafka topic
        outputStream
                .to("low-bike-availability_2" , Produced.with(Serdes.String(), Serdes.String()));

**in topic my data is look like that with key and value :**

150 {"stationId":"150","numBikesAvailable":1,"latitude":40.720875,"longitude":-73.98086,"capacity":29,"ratio":"3.45%"}
164 {"stationId":"164","numBikesAvailable":2,"latitude":40.75323,"longitude":-73.97032,"capacity":47,"ratio":"4.26%"}
167 {"stationId":"167","numBikesAvailable":1,"latitude":40.7489,"longitude":-73.97605,"capacity":45,"ratio":"2.22%"}
174 

Elastic search sink connector.json :

{
  "name": "es-sink-mysql-foobar-02",
  "config": {
    "_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",

    "_comment": "--- Elasticsearch-specific config ---",
    "_comment": "Elasticsearch server address",
    "connection.url": "http://localhost:9200",

    "_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist  ",
    "type.name": "type.name=kafka-connect",

    "_comment": "Which topic to stream data from into Elasticsearch",
    "topics": "low-bike-availability_2",

    "_comment": "If the Kafka message doesn't have a key (as is the case with JDBC source)  you need to specify key.ignore=true. If you don't, you'll get an error from the Connect task: 'ConnectException: Key is used as document id and can not be null.",
    "key.ignore": "true"
  }
rmoff commented 5 years ago

The best way to get support for a new issue is :