alibaba-archive / kafka-connect-mongo

Kafka mongo connector (deeply inspired by https://github.com/DataReply/kafka-connect-mongodb)
Apache License 2.0
29 stars 13 forks source link

Issue using JsonConverter for kafka-connect-mongo as Sink #2

Closed excentrik closed 7 years ago

excentrik commented 7 years ago

Hi

Using the kafka MongoDB connector as sink for a kafka topic, I'm getting the following error:

kafka-connect_1  | [2017-03-02 13:32:32,050] DEBUG Receive records 1 (org.apache.kafka.connect.mongo.MongoSinkTask)
kafka-connect_1  | [2017-03-02 13:32:32,051] TRACE Put record: SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='rh_stats', kafkaPartition=0, key=null, value={schema={optional=false, type=struct, fields=[{field=type, optional=false, type=string}, {field=id, optional=false, type=string}, {field=date, optional=false, type=string}]}, payload={date=2017-03-02T15:28:15+02:00, id=58b81def81b610c6168b4568, type=rh_agenda_agenda_created}}, timestamp=-1} (org.apache.kafka.connect.mongo.MongoSinkTask)
kafka-connect_1  | [2017-03-02 13:32:32,053] ERROR Task mongo-stats-connector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
kafka-connect_1  | java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
kafka-connect_1  |      at org.apache.kafka.connect.mongo.MongoSinkTask.put(MongoSinkTask.kt:48)
kafka-connect_1  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)
kafka-connect_1  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
kafka-connect_1  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
kafka-connect_1  |      at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
kafka-connect_1  |      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
kafka-connect_1  |      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
kafka-connect_1  |      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
kafka-connect_1  |      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
kafka-connect_1  |      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
kafka-connect_1  |      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
kafka-connect_1  |      at java.lang.Thread.run(Thread.java:745)
kafka-connect_1  | [2017-03-02 13:32:32,067] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)

I tried several different payloads, from the simple:

"type":"rh_agenda_agenda_created","id":"58b81e9e81b610c8168b456b","date":"2017-03-02T15:31:11+02:00"}}

to the one seen in the error message with a schema specified, but to no avail. Always the same error happens. I was wondering if this is a know limitation or if you have ideas on how to fix it.

The connector is created with:

curl -X POST -H "Content-Type: application/json" \
    --data '{"name": "mongo-stats-connector", "config": {"connector.class":"org.apache.kafka.connect.mongo.MongoSinkConnector", "tasks.max":"1", "mongo.uri":"mongodb://mongo:27017", "topics": "rh_stats", "databases": "test.stats"}}' http://kafka-connect:8083/connectors

ENVIRONMENT variables for the kafka-connect container:

      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_GROUP_ID: connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
      CONNECT_REST_ADVERTISED_HOST_NAME: 192.168.1.139
      CONNECT_REST_ADVERTISED_PORT: 8083
      CONNECT_REST_PORT: 8083
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG_LEVEL: TRACE
sailxjx commented 7 years ago

Hi, @excentrik, it is a know limitation.

mongo-sink is another side of mongo-source, it will only consume the records which serialized from a struct. For example, mongo source will produce records like:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":true,"field":"ts"},{"type":"int32","optional":true,"field":"inc"},{"type":"string","optional":true,"field":"id"},{"type":"string","optional":true,"field":"database"},{"type":"string","optional":true,"field":"op"},{"type":"string","optional":true,"field":"object"}],"optional":false,"name":"mongo_21_schema_kafka_t"},"payload":{"ts":1488440425,"inc":11150,"id":"58b7cc69304bff419f0992a2","database":"kafka_t","op":"i","object":"{ \"_id\" : { \"$oid\" : \"58b7cc69304bff419f0992a2\" }, \"key\" : 0.6001637088183821, \"ts\" : { \"$timestamp\" : { \"t\" : 1488440425, \"i\" : 0 } } }"}}

Which contains exactly a schema, an id and a payload fields.

excentrik commented 7 years ago

Hi @sailxjx,

Thanks for the reply. Hmm, for our use case, it would be essential to use the JsonConverter. But we'll see if we manage to go around the limitation. Thanks again.

sailxjx commented 7 years ago

@excentrik you're welcome, and PR is welcome.