hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

converting string dates or epochs into ISODate #105

Closed syalinbas closed 5 years ago

syalinbas commented 5 years ago

Hi,

How do I have the following fields converted into ISODate when inserted into mongo?

{ "timestamp" : "2018-10-01T17:17:04.117+0000" }

{ "timestamp" : 1537375035222 }

We do not use AVRO format but plain JSON.

I think one solution is to use $setOnInsert defined in mongo. But I would like to solve this using only kafka-connect if possible.

Thanks!

lazaromedina commented 5 years ago

Hi, maybe I can help you, if data is plain Json you will get a mongodb document with a timestamp key as String. Try using Json with explicit Schema in Kafka, something like this:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "field": "key1",
        "type": "string"
      },

...other fields,

      {
         "field":"timestamp",
         "type": "int64",
         "name": "org.apache.kafka.connect.data.Timestamp", 
         "version": 1
      }
                 ]
},
{
  "payload": {
     "key1":"value_for_key1",

      ...other payloads for fields,

     "timestamp":"1537375035222"
      }
}

check link: Kafka Connect Deep Dive – Converters and Serialization Explained

syalinbas commented 5 years ago

@lazaromedina Thanks for the reply. Unfortunately I do not control the source, and only duplicate the incoming to a local broker. I think your proposed solution requires a schema for each incoming message.

@hpgrahsl any suggestions?

syalinbas commented 5 years ago

I was wrong setInsert does not help for this. It's just an operator for a single operation.

syalinbas commented 5 years ago

Following did the trick for the timestamps given in that format, but not for epoch.

`
"transforms": "timestamp", "transforms.timestamp.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value" "transforms.timestamp.format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ", "transforms.timestamp.target.type": "Timestamp",

"transforms.timestamp.field": "eventTimestamp", `

syalinbas commented 5 years ago

ok I also found finally how to do it if the incoming data is in epoch format. You just simply skip the "transforms.timestamp.format".