hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is STRING #70

Closed bajaj-varun closed 5 years ago

bajaj-varun commented 5 years ago

Data from source system - abc@abc.com# kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic TTDF.TCDCPOC_DATA_TYPES --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

"57508564"      {"data":{"SEQNO":{"int":57508564},"TEXT":{"string":"Lorem ipsum dolor sit amet,"},"BIGNUM":{"long":11122233344447},"BINOBJ":{"bytes":"#~¦`¬| DATA IS STORED AS BINARY|>"},"CHAROBJ":{"string":"<text>THIS DATA IS STORED AS CLOB</text>"},"FLOATNUM":{"double":6.62607015E-34},"CHARVAR":{"string":"consectetur adipiscing elit,sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."}},"headers":{"operation":"REFRESH","changeSequence":"","timestamp":"","streamPosition":"","transactionId":"","changeMask":null,"columnMask":null}}

^CProcessed a total of 6 messages

Schema registry -

{
  "subject": "TTDF.TCDCPOC_DATA_TYPES-value",
  "version": 3,
  "id": 12,
  "schema": "{"type":"record","name":"DataRecord","fields":[{"name":"data","type":{"type":"record","name":"Data","fields":[{"name":"SEQNO","type":["null","int"],"default":null},{"name":"TEXT","type":["null","string"],"default":null},{"name":"BIGNUM","type":["null","long"],"default":null},{"name":"BINOBJ","type":["null","bytes"],"default":null},{"name":"CHAROBJ","type":["null","string"],"default":null},{"name":"FLOATNUM","type":["null","double"],"default":null},{"name":"CHARVAR","type":["null","string"],"default":null}]}},{"name":"headers","type":{"type":"record","name":"Headers","fields":[{"name":"operation","type":{"type":"enum","name":"operation","symbols":["INSERT","UPDATE","DELETE","REFRESH"]}},{"name":"changeSequence","type":"string"},{"name":"timestamp","type":"string"},{"name":"streamPosition","type":"string"},{"name":"transactionId","type":"string"},{"name":"changeMask","type":["null","bytes"]},{"name":"columnMask","type":["null","bytes"]}]}}]}"
}

Errors -

[2019-02-12 12:28:48,364] ERROR WorkerSinkTask{id=mongo-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache
.kafka.connect.runtime.WorkerSinkTask:584)
org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is STRING.
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:690)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:32)
        at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$3(MongoDbSinkTask.java:186)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:185)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:122)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:111)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:110)
        at java.util.HashMap.forEach(HashMap.java:1289)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:109)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-02-12 12:28:48,364] ERROR WorkerSinkTask{id=mongo-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)

Config file -

{
   "name": "mongo",
   "config": {

        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "internal.key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "internal.key.converter.schemas.enable":"false",
        "key.converter.schemas.enable": false,
        "key.ignore":"true",

        "value.converter":"io.confluent.connect.avro.AvroConverter",
        "internal.value.converter":"io.confluent.connect.avro.AvroConverter",
        "value.converter.schemas.enable": true,
        "internal.value.converter.schemas.enable":"true",

        "key.converter.schema.registry.url":"http://localhost:8081",
        "value.converter.schema.registry.url":"http://localhost:8081",

        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "topics":"TTDF.TCDCPOC_DATA_TYPES",
        "mongodb.connection.uri":"mongodb://xxxx:Password1@xxxx:27017/testdb?authSource=xxx",
        "mongodb.collection":"TCDCPOC_DATA_TYPES",

        "_comment":"transforms\":\"createKey",
        "_comment":"transforms.createKey.type:org.apache.kafka.connect.transforms.Flatten$Value",
        "_comment":"transforms.Flatten.delimiter:_",
        "_comment":"transforms.createKey.type:io.confluent.connect.transforms.Drop$Key",
        "_comment":"transforms.createKey.skip.missing.or.null\":\"true",
        "_comment":"transforms.createKey.type\":\"org.apache.kafka.connect.transforms.ValueToKey",
        "_comment":"transforms.createKey.fields\":\"data.SEQNO",
        "_comment":"transforms.createKey.static.key:test"
        }
}
rhauch commented 5 years ago

@bajaj-varun your configuration for the internal key and value converters is incorrect. In particular, the internal.value.converter uses the AvroConverter, but there is no corresponding internal.value.converter.schema.registry.url. Check your Connect worker logs for errors and warnings, because I'm surprised you got this far.

However, I strongly recommend you only use the JSON converter with schemas disabled for internal converters, configured as:

...
        "internal.key.converter":"org.apache.kafka.connect.json.JsonConverter",
        "internal.key.converter.schemas.enable":"false",
        "internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
        "internal.value.converter.schemas.enable":"false",
...

Better yet, if you're using Apache Kafka version 2.0 or later (or Confluent Platform 5.0 or later), there are defaults and you can remove these altogether from your worker configuration.

These are used by the Connect distributed worker to store the internal configs, status, and offsets inside Kafka topics. Your own data from the connectors is always read and written via the non-internal converters.

Note that if you cannot change these once you've started your Connect worker cluster. Just be aware that you very well may run into trouble in the future -- hopefully this is a development cluster that you can recreate.

hpgrahsl commented 5 years ago

@rhauch Many THX for the answer you contributed here.

@bajaj-varun let me add to that the following. irrespective of what @rhauch suggests I also see that you key seems to be a plain string. this would not work since the connector always expects the data to be parsable from valid JSON. very similar questions have come up in the past. e.g. read about potential solutions here #36 or #64.

please let me know if this helps to fix your problem.

bajaj-varun commented 5 years ago

Hi @rhauch , @hpgrahsl

Good morning. Hope you are doing great and Thanks for your response. Out of curiosity, is there any way possible to skip reading of "key" because this is part of simple POC where we just need to consume the message and dump to downstream without any modification in nested data ?

bajaj-varun commented 5 years ago

Hi @hpgrahsl your referenced solutions helped also we made key as null to skip the keys and the solution also worked as for POC we need only values. Thanks again for your help and marking the issue as close.

hpgrahsl commented 5 years ago

@bajaj-varun glad to hear you got it working for your use case! if you do more than the POC let me know. always happy to learn about yet another real-world story :) ah and also thx for closing this.