hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

Issue with MongoDB sink connector, Records inserted to MongoDB are in the form of CDC structure instead of a mongo document structure #128

Closed vinaymurari closed 3 years ago

vinaymurari commented 3 years ago

Hi,

I am using confluent kafka connect to capture CDC from MS SQL(source) to MongoDB(sink). This is the source connecetor used for MS SQL debezium-debezium-connector-sqlserver-1.5.0. PFB source connector config.

"config":{ "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":false, "value.converter.schemas.enable":false, "database.server.name": "test", "database.dbname": "testdb", "database.hostname": "", "database.port": "1433", "database.user": "", "database.password": "****", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "schema_db"}

With this we are able to consume the data. However when we try to sink this data to mongoDB collection using 'hpgrahsl-kafka-connect-mongodb-1.4.0' we are able to push the data to MongoDB but in the below format.

Mongo doc structure as follows:

/**

CDC is not handled and for each change we are getting a new record instead of updating the existing records.

Sink config as follows:

"config": { "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector", "topics": "test.dbo.Inventory1", "mongodb.connection.uri": "mongodb://localhost:27017/kafkaconnect?w=1&journal=true", "mongodb.collection": "mssql-moh", "mongodb.change.data.capture.handler": "at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":false, "value.converter.schemas.enable":false
}

Is there a way to handle this ?

vinaymurari commented 3 years ago

Looks like a config issue , we will comeback with more info.

itu-reinhard commented 4 months ago

Looks like a config issue , we will comeback with more info.

Have you solved it? I have the same issue @vinaymurari