Open Ronniexie opened 8 years ago
I need to start Schema Registry before I running the kafka-connect-mongodb?
Hi Ronniexie,
Did you know how to use the jar? If you know, please help to explain it as clear as possible. I am a newer for Kafka.
Best regards, Tony
@Ronniexie yes, the schema registry has to be started beforehand. You need to start Zookeeper, Kafka and then the schema registry. When everything is running, the easiest way for starting the connector is copying the jar with dependencies in the folder confluent/share/java/confluent-common and then run a distributed connector. Once the distributed connector is running, you can simply push a configuration for starting the mongo connector.
Unfortunately the connnector works only with confluent 2 since I didn't have time to update it to work with the new version, as soon as I have time I'll update it.
@patelliandrea, thanks for your reply, but when I follow the above steps, I got the following errors,
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:319)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x10c7661(above 10ffff) at char #1, byte #7)
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x10c7661(above 10ffff) at char #1, byte #7)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189)
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:1854)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:571)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Exception in thread "WorkerSinkTask-mongodb-sink-connector-0" org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:319)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x10c7661(above 10ffff) at char #1, byte #7)
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x10c7661(above 10ffff) at char #1, byte #7)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189)
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:1854)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:571)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
My steps is as below,
1 Use the kafka-avro-console-producer to add the schema,
./bin/kafka-avro-console-producer \
--broker-list localhost:9094 --topic test2 \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1": "value1"}
2 The configure of connect-file-sink.properties is as below,
name=mongodb-sink-connector
connector.class=org.apache.kafka.connect.mongodb.MongodbSinkConnector
tasks.max=3
host=127.0.0.1
port=27017
bulk.size=100
mongodb.database=fafa
mongodb.collections=fafa123
topics=test2
3. The configure of connect-standalone.properties is as below,
bootstrap.servers=localhost:9094
# If I set the following values as true, it always report "JsonDeserializer with schemas.enable requires #\"schema\" and \"payload\" fields and may not contain additional fields", even the message of the #topics is as: '{"schema" : {"type" : "struct","optional" : false,"fields" : [{"type" : "string","optional" : false,"field" : "company"}]},"payload" : {"company": "debezium", }}', so I set them as false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
4. Then I use the following command to test the connector,
./bin/connect-standalone ./etc/kafka/connect-standalone.properties ./etc/kafka/connect-file-sink.properties
Do I need to do more configure?
I resolved it, that's because I use the incorrect worker properties, I should use the following command:
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka/connect-file-sink.properties
But, I have another question, can I use this in kafka without confluent?
Yes you can, you have to use
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
instead of the AvroConverter
hi,patelliandrea.Does it support confulent 3 now?
hi,patelliandrea! When I run it use your and Tony's method,I found the error: [2017-08-11 17:30:21,473] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:100) java.lang.NullPointerException at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133) at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:257) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:157) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94) Please! Ps:My confluent's version is 3.3.0.
I am getting same error java.lang.NullPointerException at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133) at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:257) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:157) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94)
can someone help on this please ..using confluent's version is 3.3.0
Whenever i run mongodb source connector its show this error : [2017-11-16 06:11:54,968] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser:109) [2017-11-16 06:11:54,968] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser:110) [2017-11-16 06:11:54,975] ERROR WorkerSourceTask{id=mongodb-source-connector-0} Task failed initialization and will not be started. (org.apache.kafka.connect.runtime.WorkerSourceTask:126) java.lang.ClassCastException: Non-string value found in original settings for key converter.class: null at org.apache.kafka.common.config.AbstractConfig.originalsStrings(AbstractConfig.java:165) at org.apache.kafka.connect.runtime.WorkerSourceTask.initialize(WorkerSourceTask.java:124) at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:405) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:278) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:302) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:183) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:102) [2017-11-16 06:11:54,979] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:341) [20
Hello
i have kafka and mongoDb installed and running, can anyone give me a small java class example to run to send messages from MyTopic to Mongo? ( I do not have Confluent , simply kafka)
thank you
Where do I see the logs for the kafka-connect-mongodb? as I kept the jar in lib directory in kafka and restarted the kafka instance. But new messages in Kafka are not getting sink-ed to Mongo? As I only want to sink data in Mongo from Kafka - thus have only used the sink files in jar
Whether the demo export into a jar, and put it to lib of kafka?