hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

Topic<->Collection Mapping not working properly #94

Closed Seljuke closed 5 years ago

Seljuke commented 5 years ago

Hi @hpgrahsl, I was playing with your connector and needed to map 3 topics from kafka to mongodb, as you wrote in documentation I use the config below;

"topics":"test-1,test-2,test-3",
"mongodb.collections":"test-1,test-2,test-3",
"mongodb.collection.test-1":"test-1",
"mongodb.collection.test-2":"test-2",
"mongodb.collection.test-3":"test-3",

After applying this config through REST API, "mongodb.collection" parameter comes with "kafkatopic" value every time (log below);

kafka-connect_1_a21ef7e13be6 | [2019-07-19 07:44:48,839] INFO MongoDbSinkConnectorConfig values:
kafka-connect_1_a21ef7e13be6 |  mongodb.change.data.capture.handler =
kafka-connect_1_a21ef7e13be6 |  mongodb.collection = kafkatopic

I used to below Dockerfile to build my own image and user your connector with version 1.3.1;

FROM confluentinc/cp-kafka-connect-base:5.2.2

RUN   confluent-hub install --no-prompt hpgrahsl/kafka-connect-mongodb:1.3.1

For now I created 3 different connector for each topic, but I would like to use topic to collection mapping.

hpgrahsl commented 5 years ago

@Seljuke did you even try to run it and see if the collections get created based on the kafka topics and your configuration? the reason you see this in the log the mongodb.collection = kafkatopic entry is most likely due to the fact that you configured this to be used as fallback topic name if the connector sees a topic for which no mapping is defined. please carefully read through the docs. also it might be helpful to post you full configuration.

Seljuke commented 5 years ago

Collections didn't created, but instead a new "kafkatopic" collection created. Full config at below;

{
  "name":"MyMongoDbSinkConnector",
  "config":{
    "connector.class":"at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
    "tasks.max":"1",
    "topics":"test-1,test-2,test-3",
    "mongodb.connection.uri":"mongodb://user:pass@IP:PORT/test?authSource=admin",
    "mongodb.collections":"test-1,test-2,test-3",
    "mongodb.collection.test-1":"test-1",
    "mongodb.collection.test-2":"test-2",
    "mongodb.collection.test-3":"test-3",
    "mongodb.max.num.retries":"3",
    "mongodb.retries.defer.timeout":"5000",
    "mongodb.value.projection.type":"whitelist",
    "mongodb.value.projection.list":"*.*",
    "mongodb.document.id.strategy":"at.grahsl.kafka.connect.mongodb.processor.id.strategy.PartialValueStrategy",
    "mongodb.document.id.strategies":"",
    "mongodb.key.projection.type":"whitelist",
    "mongodb.key.projection.list":"content.key,content.key2",
    "mongodb.field.renamer.mapping":"[]",
    "mongodb.field.renamer.regexp":"[]",
    "mongodb.post.processor.chain":"at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder",
    "mongodb.change.data.capture.handler":"",
    "mongodb.delete.on.null.values":"false",
    "mongodb.writemodel.strategy":"at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
    "mongodb.max.batch.size":"0",
    "mongodb.rate.limiting.timeout":"0",
    "mongodb.rate.limiting.every.n":"0"
  }
}
hpgrahsl commented 5 years ago

@Seljuke it took me some time to check this myself but found the time right now :)

I tried to reproduce your issue but unfortunately I wasn't able to. Using 1.3.1 of the sink connector with the configuration you posted above worked just fine w.r.t. the topic name mappings. What's strange though is that you still get the "kafkatopic" as the default collection name. this was used in older versions of the sink connector as a convention when nothing was explicitly configured but was removed a while ago.

When I use the configuration above I get the following log output, where you can see that the empty string is listed for the property mongodb.collection=

[2019-08-07 15:44:24,338] INFO MongoDbSinkConnectorConfig values: 
    mongodb.change.data.capture.handler = 
    mongodb.collection = 
    mongodb.collections = test-1,test-2,test-3
    mongodb.connection.uri = mongodb://localhost:27017/issue94
    mongodb.delete.on.null.values = false
    mongodb.document.id.strategies = 
    mongodb.document.id.strategy = at.grahsl.kafka.connect.mongodb.processor.id.strategy.PartialValueStrategy
    mongodb.field.renamer.mapping = []
    mongodb.field.renamer.regexp = []
    mongodb.key.projection.list = content.key,content.key2
    mongodb.key.projection.type = whitelist
    mongodb.max.batch.size = 0
    mongodb.max.num.retries = 3
    mongodb.post.processor.chain = at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder
    mongodb.rate.limiting.every.n = 0
    mongodb.rate.limiting.timeout = 0
    mongodb.retries.defer.timeout = 5000
    mongodb.value.projection.list = *.*
    mongodb.value.projection.type = whitelist
    mongodb.writemodel.strategy = at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneBusinessKeyStrategy
 (at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig:279)

So it might even be that your kafka connect distribution is not running the 1.3.1 version of the connector. Can you please verify this? In the meantime I will close this issue which seems to be invalid given your descriptions above. When can reopen later at any time if really needed.