hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

How to sink JSON objects in a JSON array as individual records in a collection #102

Closed syalinbas closed 5 years ago

syalinbas commented 5 years ago

I would like to sink elements of a given JSON Array into Mongo as individual records. For example given the following array:

[ { "name" : "john", "age" : 20 }, { "name" : "jane", "age" : 22 } ]

each object should be one record in the given collection. I am getting the following error. One option is to read from one topic and write into another one breaking array into individual objects, but I wonder if it is possible to do it out-of-the box with kafka-connect and or with this connector.

kafka-connect_1 | org.apache.kafka.connect.errors.DataException: error: no converter present due to unexpected object type java.util.ArrayList kafka-connect_1 | at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.getRecordConverter(SinkConverter.java:78) kafka-connect_1 | at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:49) kafka-connect_1 | at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$3(MongoDbSinkTask.java:213) kafka-connect_1 | at java.util.ArrayList.forEach(ArrayList.java:1257) kafka-connect_1 | at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:212) kafka-connect_1 | at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:143) kafka-connect_1 | at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118) kafka-connect_1 | at java.util.ArrayList.forEach(ArrayList.java:1257) kafka-connect_1 | at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117) kafka-connect_1 | at java.util.HashMap.forEach(HashMap.java:1289) kafka-connect_1 | at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112) kafka-connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565) kafka-connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323) kafka-connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) kafka-connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) kafka-connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) kafka-connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) kafka-connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) kafka-connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266) kafka-connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) kafka-connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) kafka-connect_1 | at java.lang.Thread.run(Thread.java:748) kafk

hpgrahsl commented 5 years ago

hi @zbrsy great to hear you are trying the sink connector. two important aspects w.r.t. the issue you posted:

1) For several reasons, the sink connector has been designed to work with JSON objects for keys and values. This means the value of a kafka records is expected to be represented as a JSON object instead of a JSON array - this causes the error you posted. What's possible is of course to work with a value that only uses a single field holding the array in question e.g.

{ "myArray": 
  [ 
   { "name" : "john", "age" : 20 },
   { "name" : "jane", "age" : 22 }
  ]
}

so all you need to do is change this value structure of yours to reflect that, then it should work.

2) Storing individual documents for the array entries is not possible at the moment and will also not get implemented. The idea of kafka connect is basically to provide a 1:1 mapping between records (sources -> kafka or kafka -> sinks). There are reasons why it's not the best idea to try to perform this "fan out" i.e. split a single kafka record into multiple ones towards the sink. Also the single message transform (SMT) mechanism of kafka-connect itself operates on a single message only for that matter. The solution to this is to employ a stream processor before (KStreams and/or KSQL are your friends here) which flatMaps the array into separate kafka records, writes them into a separate topic which can the be used with the sink connector.

Let me know if you need any further info/help.

syalinbas commented 5 years ago

@hpgrahsl Thanks for the reply. It all make sense. Second option is the path I was planning take but first I wanted to check if I missed anything. Again thanks for clarifications and the reply.

hpgrahsl commented 5 years ago

Glad to hear that my reply was helpful. Feel free to reach out anytime you need something. Also it might be wise to checkout the official MongoDB connector which became available several weeks ago. Find the repo here: https://github.com/mongodb/mongo-kafka also the migration is pretty smooth since the sink connector is fully based on this code-base :)

linxar109 commented 4 years ago

I have the same issue as zbrsy.

Are you sure Kafka will not implement JSON arrays in mongo connector sink?

How can I implement stream processor as you mentioned.

Thanks a lot.