Closed bovy89 closed 1 year ago
Resolved.
MongoDB >= 6.0 required
Debezium >= 2.4.0 required (not released yet):
MongoDB collections must be created with changeStreamPreAndPostImages: true
:
db.getSiblingDB('inventory').runCommand ( { collMod: "orders", changeStreamPreAndPostImages: { enabled: true } } );
db.getSiblingDB('inventory').runCommand ( { collMod: "products", changeStreamPreAndPostImages: { enabled: true } } );
db.getSiblingDB('inventory').runCommand ( { collMod: "customers", changeStreamPreAndPostImages: { enabled: true } } );
Source connectors must specify capture.mode: change_streams_with_pre_image
or capture.mode: change_streams_update_full_with_pre_image
curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors --data '
{
"name": "mongodb-connector",
"config": {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": 1,
"mongodb.connection.mode": "replica_set",
"mongodb.connection.string": "mongodb://db-mongo:27017/?replicaSet=rs0",
"mongodb.user" : "debezium",
"mongodb.password" : "dbz",
"topic.prefix" : "mongodb",
"collection.include.list": "inventory.products,inventory.orders,inventory.customers",
"capture.mode": "change_streams_with_pre_image"
}
}'
Sink connectors must include id rename as follow:
curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors --data '
{
"name" :"iceberg-sink-mongodb",
"config": {
"connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink",
"topics.regex": "mongodb.*",
"iceberg.catalog-impl": "org.apache.iceberg.hive.HiveCatalog",
"iceberg.warehouse": "s3a://warehouse/iceberg",
"iceberg.uri": "thrift://hive-metastore:9083",
"iceberg.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.s3.endpoint": "http://minio:9000",
"iceberg.s3.access-key-id": "admin",
"iceberg.s3.secret-access-key": "password",
"iceberg.s3.path-style-access": true,
"table.auto-create": true,
"allow-field-addition": false,
"upsert.keep-deletes": false,
"transforms": "unwrap,renamekeyfield",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.add.fields": "op,collection,source.ts_ms,db",
"transforms.unwrap.drop.tombstones": true,
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.renamekeyfield.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.renamekeyfield.renames": "id:_id"
}
}'
source connector:
sink connector:
example data:
this configuration do not work (error on insert)
changing sink as follow:
works:
but if I try to delete it:
the following error will be triggered:
any idea?