hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

hpgrahsl kafka-connect-mongodb and geojson data #107

Closed opistara closed 4 years ago

opistara commented 4 years ago

Hello Hans-Peter, I'm using your kafka-connect-mongodb v 1.3.1, with Kafka 0.10.1 and schema registry (confluent) ver 3.1

I also use Nifi 1.5.0 to ingest data and publish it in Kafka topic by publishKafkaRcord processor.

Now if i ingest, a flat, csv file associated to a simple avro schema, everything is ok. Infact i ingest CSV data, procces single record, publish on Kafka topic, and then mongodb sink consume from topic and save on mongoDB collection. Wonderful!!!

But if i ingest geojson file, my pipeline breaks on your mongoDB sink.

I give you more information about my geojson data processing.

1)To simplify i ingest a geojson with a single record [ { "type" : "Feature", "geometry": { "type": "Polygon", "coordinates": [ [ [ 18.183730244636536, 40.35109032638699 ], [ 18.183746337890625, 40.351016739174426 ], [ 18.183730244636536, 40.35109032638699 ] ] ] }, "properties": {
"name": "Z-04", "description": "parking 1,30 €/hour"
} }]

2) this the avro schema that i have regitered on schema-registry

{"type": "record", "name": "piano_sosta_tariffata_lecce", "namespace": "opendata.piano_sosta-le", "fields": [
{"name" : "type", "type" : "string"},
{"name" : "geometry", "type" : { "type" : "record", "name" : "_geometry", "fields" : [ {"name": "type", "type": "string"}, {"name" : "coordinates", "type" : { "type": "array", "items": { "type": "array", "items": {"type": "array", "items" : "double" }
}
}
} ] } }, {"name" : "properties", "type" : { "type" : "record", "name" : "_properties", "fields" : [ {"name": "name", "type": "string"}, {"name": "description", "type": "string"} ] }
},
{"name": "ingestion_date", "type": {"type": "int", "logicalType":"date"}
},
{"name": "ingestion_timestamp", "type": { "type": "long", "logicalType": "timestamp-millis"}
} ] }

I have also used an on-line avro validator (https://json-schema-validator.herokuapp.com/avro.jsp) to check the avro schema correctness.

3) I ingest and publish it on kafka topic, and using "kafka-avro-console-consumer" tool with following command sudo bin/kafka-avro-console-consumer --zookeeper hdpmaster1.it:2181 --topic raw_OD_LE_piano_sosta_tariffata --from-beginning I consume and can see the record {"type":"Feature","geometry":{"type":"Polygon","coordinates":[[[18.183730244636536,40.35109032638699],[18.183746337890625,40.351016739174426],[18.183730244636536,40.35109032638699]]]},"properties":{"name":"","description":""},"ingestion_date":0,"ingestion_timestamp":1570201210514}

4) At this point, even WorkerSinkTask mongoDB, try to consume the same record on Kafka topic, but it has problem during deserialization, i think. This the error that i found on confluent-kafka-connect log:

Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: [2019-10-09 13:37:09,515] ERROR Task mongodb_sink_OD_LE_piano_sosta_tariffata-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerS Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: org.apache.kafka.connect.errors.DataException: error while processing field geometry Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.processField(AvroJsonSchemafulRecordConverter.java:131) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.lambda$toBsonDoc$0(AvroJsonSchemafulRecordConverter.java:92) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.ArrayList.forEach(ArrayList.java:1257) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.toBsonDoc(AvroJsonSchemafulRecordConverter.java:92) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.convert(AvroJsonSchemafulRecordConverter.java:78) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:50) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$3(MongoDbSinkTask.java:213) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.ArrayList.forEach(ArrayList.java:1257) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:212) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:143) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.ArrayList.forEach(ArrayList.java:1257) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.HashMap.forEach(HashMap.java:1289) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.FutureTask.run(FutureTask.java:266) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.lang.Thread.run(Thread.java:748) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: Caused by: org.apache.kafka.connect.errors.DataException: error while processing field coordinates Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.processField(AvroJsonSchemafulRecordConverter.java:131) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.lambda$toBsonDoc$0(AvroJsonSchemafulRecordConverter.java:92) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.ArrayList.forEach(ArrayList.java:1257) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.toBsonDoc(AvroJsonSchemafulRecordConverter.java:92) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.handleStructField(AvroJsonSchemafulRecordConverter.java:178) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.processField(AvroJsonSchemafulRecordConverter.java:119) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: ... 26 more Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: Caused by: org.apache.kafka.connect.errors.DataException: Cannot list fields on non-struct type Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.data.ConnectSchema.fields(ConnectSchema.java:177) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.toBsonDoc(AvroJsonSchemafulRecordConverter.java:92) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.handleArrayField(AvroJsonSchemafulRecordConverter.java:168) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at at.grahsl.kafka.connect.mongodb.converter.AvroJsonSchemafulRecordConverter.processField(AvroJsonSchemafulRecordConverter.java:122) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: ... 32 more Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: [2019-10-09 13:37:09,516] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:405) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: [2019-10-09 13:37:09,516] INFO WorkerSinkTask{id=mongodb_sink_OD_LE_piano_sosta_tariffata-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: [2019-10-09 13:37:09,519] ERROR Task mongodb_sink_OD_LE_piano_sosta_tariffata-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerT Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:406) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.FutureTask.run(FutureTask.java:266) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: at java.lang.Thread.run(Thread.java:748) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: [2019-10-09 13:37:09,519] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143) Oct 09 13:37:09 hdpmaster1 connect-distributed.sh[27228]: [2019-10-09 13:37:09,519] INFO stopping MongoDB sink task (at.grahsl.kafka.connect.mongodb.MongoDbSinkTask:258)

I suspect that the "array of array" presence in my geojson data creates deserialization problem to the WorkerSinkTask mongoDB. Is there some aspect I am missing? Can you help me to understand what is the problem and what workaround adopt to overcome it.

Cheers and thx.

Orazio

hpgrahsl commented 4 years ago

@opistara thx for reaching out. glad to hear you like this project and find it useful!

Based on your detailed issue description including logs it looks very much that the actual problem is indeed related to a multi-level nesting of arrays. I will further investigate to be sure and report back. Cannot promise when there will be a fix for this but it shouldn't take too long.

hpgrahsl commented 4 years ago

@opistara you may try the connector based on this branch / snapshot

hpgrahsl commented 4 years ago

@opistara it's merged now. so just build the latest from master branch and give it a try.

opistara commented 4 years ago

@hpgrahsl thanks a lot.

hpgrahsl commented 4 years ago

@opistara does it work for your GeoJSON use case now with the provided fix? would be nice to know :)