hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

Error with shard mongoDB #131

Closed pedefalco closed 1 year ago

pedefalco commented 2 years ago

Hi, we use your connector to send data to a MongoDB. Initially the mongo was installed in ReplicaSet mode on a cluster with 3 node, now it has been changed to Shard mode. After this change we receive the following error

Failed to target upsert by query :: could not extract exact shard key

this is an example of our connector

{
        "name": <name>,
        "config": {
                "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
                "key.converter.schemas.enable": "false",
                "topics": <nametopic>,
                "mongodb.connection.uri": <mongoconnectionurl>
                "mongodb.writemodel.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.strategy.UpdateOneTimestampsStrategy",
                "value.converter.schemas.enable": "false",
                "transforms": "WrapKey",
                "max.num.retries":15,
                "retries.defer.timeout":60000,
                "tasks.max":2,
                "name": <...>,
                "value.converter": "org.apache.kafka.connect.json.JsonConverter",
                "mongodb.collection": <name collection>,
                "errors.tolerance": "all",
                "errors.log.include.messages": "true",
                "errors.deadletterqueue.topic.name": <...>,
                "errors.deadletterqueue.context.headers.enable": "true",
                "transforms.WrapKey.field": "_id",
                "transforms.WrapKey.type": "org.apache.kafka.connect.transforms.HoistField$Key",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter"
        }
}

the shardkey is composed of 3 different values, how can we use this connector to allow writing on the mongo?

UPDATE

we try another strategy

{
    "name": "mongodb-connector-coll-event",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "topics": "....",
        "mongodb.connection.uri": "....",
                "max.num.retries":15,
                "retries.defer.timeout":60000,
                "tasks.max":2,
        "name": "...",
        "mongodb.collection": "event",
                "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.PartialValueStrategy",
                "mongodb.key.projection.list": "Field1,Field2",
                "mongodb.key.projection.type": "whitelist",
                "mongodb.writemodel.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
                "errors.tolerance": "all",
                "errors.log.include.messages": "true",
                "errors.deadletterqueue.topic.name": "...",
                "errors.deadletterqueue.context.headers.enable": "true",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

with this code the connector doesn't give us any error but doesn't write any entries to mongodb.

We use this because we need to enter the field "_id" on mongodb so according to the readme https://github.com/hpgrahsl/kafka-connect-mongodb#use-case-1-employing-business-keys this procedure allows it

Thanks