hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

remove \u0000\u0000\u0000\u0000\u0001\u001 #101

Closed sourjya123 closed 4 years ago

sourjya123 commented 5 years ago

Hi, I am posting data from emq. From emq it will go to kafka and from kafka to mongo db. Emq is subscribing to the topic +/+/DEVICE_PROFILE.

eg, mosquitto_pub -h localhost -p 1883 -t mcd/11.22.33.44/DEVICE_PROFILE -q 2 -m "99999,2.10#"

Now emq topic becomes the key of the Kafka record(mcd/11.22.33.44/DEVICE_PROFILE). Now I can see kafka key becomes the id of the mongo. But some extra character is getting added.

now Document of the mongo looks like:

{ "_id" : { "topic" : "\u0000\u0000\u0000\u0000\u0001@mcd/11.22.33.44/DEVICE_PROFILE" }, "_insertedTS" : { "$date" : "2019-08-14T06:14:38.528+0000" }, "_modifiedTS" : { "$date" : "2019-08-14T09:52:25.308+0000" }, "deviceMessage" : "\u0000\u0000\u0000\u0000\u0002��\u000799999,2.10#" }

How can I remove \u0000\u0000\u0000\u0000\u0001\u001 and extra symbol from _id and deviceMessage?

hpgrahsl commented 5 years ago

@sourjya123 obviously this doesn't look right...

without knowing anything about your source of records i.e. the emq publisher/producer I would assume there is some weird thing going on w.r.t. any kind of binary encoding that might be involved here. can you maybe verify what you get from just using the kafka-console-consumer to read from the very same kafka topic? ah, and don't forget to include the printing of the key since per default it would only show the value of kafka records.

please report back with these results so that we can proceed to find out what causes this.

sourjya123 commented 5 years ago

Hi I am able to delete the binary encoding by editing your code little bit. Can you tell me one thing which class has been used to store the data and id.

I have used mongodb.writemodel.strategy=at.sourjya.kafka.connect.mongodb.writemodel.strategy.UpdateOneTimestampsStrategy

Now I want to update the data little bit before sending to mongo. Please let me know the class name and the logic to store the data.

sourjya123 commented 5 years ago

EMQ publish mosquitto_pub -h localhost -p 1883 -t mcd/11.22.33.44/DEVICE_PROFILE -q 2 -m "hello sourjya how are you"

Kafka Consumer with key bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic DEVICE_PROFILE --property print.key=true --property key.separator="@@@" --from-beginning

<mcd/11.22.33.44/DEVICE_PROFILE@@@2hello sourjya how are you

so I can see '>' from key and '2' from data. For these reason all these extra characters are getting added? If so, how can I remove from emq side as I have emq-kafka connector.

As of now I have cloned your code and edited little bit, so before adding to mongo all extra characters will be discarded. If you have any solution without changing your code please let me know. But please tell me the class name where all the data are getting stored. Because I need to update the data before sending it.

And also please let me know if any information is required from my side.

hpgrahsl commented 5 years ago

I am able to delete the binary encoding by editing your code little bit. Can you tell me one thing which class has been used to store the data and id.

There is something wrong if you need to change the code in this regard. Nobody ever needed to do that, provided that the serialization of the data is correct at the producer-side and configured properly for the connector.

Now I want to update the data little bit before sending to mongo. Please let me know the class name and the logic to store the data.

In case you want to apply changes you have two options: 1) either rely on in-flight transformations using SMT configuration for the connector 2) use one of the available post-processor implementations the sink connector itself applies. if none is able to do what you need write your custom post-processor. check any of the available ones to get a feeling how this is done.

for both options please READ the documentation so that you get an better understanding what's a good fit for your use case.

hpgrahsl commented 5 years ago

so I can see '>' from key and '2' from data. For these reason all these extra characters are getting added? If so, how can I remove from emq side as I have emq-kafka connector.

I'd suggest you try to find out where this "extra" data comes from. I cannot comment on emq-kafka-connector since I've never used it before. make sure that it uses the same serialization (string, json, avro) that you expect based on configuration you use for my sink connector. again it's not recommended at all to fiddle around in the code base and make adaptions in this regard.

hpgrahsl commented 4 years ago

@sourjya123 due to inactivity and not reporting back I think we can close this issue. also what you report doesn't seem to be directly related to the sink connector implementation. feel free to comment further if needed.