memiiso / debezium-server-iceberg

Replicates any database (CDC events) to Apache Iceberg (To Cloud Storage)
Apache License 2.0
171 stars 35 forks source link

Sql server config & minio with inmemory catalog nog working #236

Closed ghost closed 9 months ago

ghost commented 9 months ago

We are testing a setup in docker with sqlserver and minio:

docker-compose.yml

version: '3.8'

services:
  sqlserver:
    container_name: sqlserver
    image: mcr.microsoft.com/mssql/server:2019-latest
    user: root
    ports:
      - "1433:1433"
    volumes:
      - ./data/sqlserver/data:/var/opt/mssql/data
      - ./data/sqlserver/log:/var/opt/mssql/log
      - ./data/sqlserver/secrets:/var/opt/mssql/secrets
    env_file:
      - .env
    environment:
      - "ACCEPT_EULA=Y"
      - "MSSQL_AGENT_ENABLED=True"
      - MSSQL_SA_PASSWORD=${MSSQL_SA_PASSWORD}
  debezium-iceberg:
    container_name: debezium
    image: ghcr.io/memiiso/debezium-server-iceberg:latest
    user: root
    volumes:
      - ./data/debezium/data:/app/data
      - ./data/debezium/conf:/app/conf
  minio:
    image: quay.io/minio/minio
    command: server /data --console-address ":9001"
    environment:
      - MINIO_ROOT_USER=minioadmin
      - MINIO_ROOT_PASSWORD=minioadmin
    volumes:
      - ./data/minio:/data
    ports:
      - 9000:9000
      - 9001:9001

application.properties:

# Use iceberg sink
debezium.sink.type=iceberg

# Iceberg sink config
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=mycatalog

# S3 config without hadoop catalog. Using InMemoryCatalog catalog And S3FileIO
### using mino as S3
debezium.sink.iceberg.s3.endpoint=http://minio:9000;
debezium.sink.iceberg.s3.path-style-access=true
debezium.sink.iceberg.s3.access-key-id=rKONJQf6ILnq3CdtjK9W
debezium.sink.iceberg.s3.secret-access-key=UrQZwTNgEqDSqCGLrVvzWnk6IqgUU9fOv9jApSde
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.warehouse=s3a://testtest/iceberg_warehouse
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.inmemory.InMemoryCatalog

# sql server source
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=sqlserver
debezium.source.database.port=1433
debezium.source.database.user=sa
debezium.source.database.password=mu6M7Q18nXAhzPfZMGtGVemKb
debezium.source.database.names=POC_Debezium_CDC
debezium.source.database.server.name=sqlserver
debezium.source.topic.prefix=test
debezium.source.database.encrypt=true
debezium.source.database.trustServerCertificate=true
debezium.source.schema.include.list=dbo
debezium.source.schema.history.internal.kafka.topic=schemahistory
debezium.source.schema.history.internal.kafka.bootstrap.servers=empty:9093,
# mandate for sql server source, avoid error when snapshot and schema change
debezium.source.include.schema.changes=false

# enable event schemas - mandate
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# saving debezium state data to destination, iceberg tables
debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table
debezium.source.database.history=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.database.history.iceberg.table-name=debezium_database_history_storage_test

# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
quarkus.log.console.json=false
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

De container will crash if we leave out debezium.source.schema.history.internal.kafka.topic=schemahistory and debezium.source.schema.history.internal.kafka.bootstrap.servers=empty but that are clearly kafka parameters.

ismailsimsek commented 9 months ago

this config must be set as mandatory, for some reason. not sure it has any effect. link to ducmentation

with above settings, schema history should be stored in destination iceberg table

debezium.source.database.history=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.database.history.iceberg.table-name=debezium_database_history_storage_test

also its set to false debezium.source.include.schema.changes=false

https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#sqlserver-property-include-schema-changes

ghost commented 9 months ago

I don't get it. The setting is mandatory, but we don't have any kafka servers in this setup, right? The log file if I include debezium.source.schema.history.internal.kafka.topic=schemahistory and exclude debezium.source.schema.history.internal.kafka.bootstrap.servers=localhost:9092

       __       __                 _                 
  ____/ /___   / /_   ___  ____   (_)__  __ ____ ___ 
 / __  // _ \ / __ \ / _ \/_  /  / // / / // __ `__ \
/ /_/ //  __// /_/ //  __/ / /_ / // /_/ // / / / / /
\__,_/ \___//_.___/ \___/ /___//_/ \__,_//_/ /_/ /_/ 

                             Powered by Quarkus 3.3.2
2023-09-28 06:43:29,220 INFO  [io.deb.ser.BaseChangeConsumer] (main) Using 'io.debezium.server.BaseChangeConsumer$$Lambda$209/0x0000000840247840@765df79d' stream name mapper
2023-09-28 06:43:29,328 INFO  [io.deb.ser.ice.IcebergUtil] (main) Using io.quarkus.arc.impl.InstanceImpl
2023-09-28 06:43:29,346 INFO  [io.deb.ser.jso.JsonSerdeConfig] (main) JsonSerdeConfig values: 
    from.field = null
    unknown.properties.ignored = false
2023-09-28 06:43:29,347 INFO  [io.deb.ser.jso.JsonSerdeConfig] (main) JsonSerdeConfig values: 
    from.field = null
    unknown.properties.ignored = false
2023-09-28 06:43:29,347 INFO  [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.iceberg.IcebergChangeConsumer' instantiated
2023-09-28 06:43:29,430 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false
2023-09-28 06:43:29,434 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false
2023-09-28 06:43:29,515 INFO  [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values: 
    access.control.allow.methods = 
    access.control.allow.origin = 
    admin.listeners = null
    auto.include.jmx.reporter = true
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    config.providers = []
    connector.client.config.override.policy = All
    header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    listeners = [http://:8083]
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    offset.flush.interval.ms = 0
    offset.flush.timeout.ms = 5000
    offset.storage.file.filename = 
    offset.storage.partitions = null
    offset.storage.replication.factor = null
    offset.storage.topic = 
    plugin.path = null
    response.http.headers.config = 
    rest.advertised.host.name = null
    rest.advertised.listener = null
    rest.advertised.port = null
    rest.extension.classes = []
    ssl.cipher.suites = null
    ssl.client.auth = none
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    task.shutdown.graceful.timeout.ms = 5000
    topic.creation.enable = true
    topic.tracking.allow.reset = true
    topic.tracking.enable = true
    value.converter = class org.apache.kafka.connect.json.JsonConverter
2023-09-28 06:43:29,516 WARN  [org.apa.kaf.con.run.WorkerConfig] (main) Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.
2023-09-28 06:43:29,520 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true
2023-09-28 06:43:29,521 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true
2023-09-28 06:43:29,522 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = header
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true
2023-09-28 06:43:29,524 INFO  [io.deb.ser.DebeziumServer] (main) Engine executor started
2023-09-28 06:43:30,039 INFO  [io.quarkus] (main) debezium-server-iceberg-dist 0.3.0-SNAPSHOT on JVM (powered by Quarkus 3.3.2) started in 2.932s. Listening on: http://0.0.0.0:8080
2023-09-28 06:43:30,039 INFO  [io.quarkus] (main) Profile prod activated. 
2023-09-28 06:43:30,039 INFO  [io.quarkus] (main) Installed features: [cdi, micrometer, resteasy-jackson, smallrye-context-propagation, smallrye-health, vertx]
2023-09-28 06:43:34,079 INFO  [io.deb.con.sql.SqlServerConnector] (pool-6-thread-1) Checking if user has access to CDC table
2023-09-28 06:43:40,294 INFO  [io.deb.jdb.JdbcConnection] (pool-8-thread-1) Connection gracefully closed
2023-09-28 06:43:40,360 INFO  [io.deb.ser.ice.off.IcebergOffsetBackingStore$IcebergOffsetBackingStoreConfig] (pool-6-thread-1) IcebergOffsetBackingStoreConfig values: 
2023-09-28 06:43:40,360 WARN  [org.apa.kaf.con.run.WorkerConfig] (pool-6-thread-1) Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.
2023-09-28 06:43:40,397 INFO  [io.deb.ser.ice.off.IcebergOffsetBackingStore] (pool-6-thread-1) Starting IcebergOffsetBackingStore table:default.debezium_offset_storage_custom_table
2023-09-28 06:43:40,400 WARN  [io.deb.ser.ice.IcebergUtil] (pool-6-thread-1) Created namespace:'default'
2023-09-28 06:43:40,405 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table properties set at catalog level through catalog properties: {}
2023-09-28 06:43:40,406 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table properties enforced at catalog level through catalog properties: {}
2023-09-28 06:43:40,721 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Successfully committed to table default.debezium_offset_storage_custom_table in 277 ms
2023-09-28 06:43:40,733 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/default/debezium_offset_storage_custom_table/metadata/00000-1a93f2ed-3ee0-482a-b76c-0a3660dd7de1.metadata.json
2023-09-28 06:43:40,766 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table loaded by catalog: mycatalog.default.debezium_offset_storage_custom_table
2023-09-28 06:43:40,767 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/default/debezium_offset_storage_custom_table/metadata/00000-1a93f2ed-3ee0-482a-b76c-0a3660dd7de1.metadata.json
2023-09-28 06:43:40,793 INFO  [org.apa.ice.SnapshotScan] (pool-6-thread-1) Scanning empty table mycatalog.default.debezium_offset_storage_custom_table
2023-09-28 06:43:40,865 INFO  [io.deb.jdb.JdbcConnection] (pool-11-thread-1) Connection gracefully closed
2023-09-28 06:43:40,878 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1) Starting SqlServerConnectorTask with configuration:
2023-09-28 06:43:40,880 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    connector.class = io.debezium.connector.sqlserver.SqlServerConnector
2023-09-28 06:43:40,880 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.table-prefix = debeziumcdc_
2023-09-28 06:43:40,881 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    transforms = unwrap
2023-09-28 06:43:40,881 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.include.list = dbo
2023-09-28 06:43:40,881 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.write.format.default = parquet
2023-09-28 06:43:40,881 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.upsert = true
2023-09-28 06:43:40,881 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.catalog-name = mycatalog
2023-09-28 06:43:40,881 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.s3.secret-access-key = UrQZwTNgEqDSqCGLrVvzWnk6IqgUU9fOv9jApSde
2023-09-28 06:43:40,881 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    include.schema.changes = false
2023-09-28 06:43:40,881 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.catalog-impl = org.apache.iceberg.inmemory.InMemoryCatalog
2023-09-28 06:43:40,881 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.s3.secret-access-key = UrQZwTNgEqDSqCGLrVvzWnk6IqgUU9fOv9jApSde
2023-09-28 06:43:40,881 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    transforms.unwrap.drop.tombstones = true
2023-09-28 06:43:40,882 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.warehouse = s3a://testtest/iceberg_warehouse
2023-09-28 06:43:40,882 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.write.format.default = parquet
2023-09-28 06:43:40,882 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.s3.access-key-id = rKONJQf6ILnq3CdtjK9W
2023-09-28 06:43:40,882 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.io-impl = org.apache.iceberg.aws.s3.S3FileIO
2023-09-28 06:43:40,882 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState
2023-09-28 06:43:40,882 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    errors.retry.delay.initial.ms = 300
2023-09-28 06:43:40,882 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    key.converter = org.apache.kafka.connect.json.JsonConverter
2023-09-28 06:43:40,882 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.io-impl = org.apache.iceberg.aws.s3.S3FileIO
2023-09-28 06:43:40,882 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.history.iceberg.table-name = debezium_database_history_storage_test
2023-09-28 06:43:40,882 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.table-name = debezium_offset_storage_custom_table
2023-09-28 06:43:40,883 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.user = sa
2023-09-28 06:43:40,883 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.names = POC_Debezium_CDC
2023-09-28 06:43:40,883 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    header.converter.value.schemas.enable = true
2023-09-28 06:43:40,883 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage = io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
2023-09-28 06:43:40,883 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    header.converter.key = json
2023-09-28 06:43:40,883 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    header.converter.key.schemas.enable = true
2023-09-28 06:43:40,883 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.s3.path-style-access = true
2023-09-28 06:43:40,883 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.s3.endpoint = http://minio:9000;
2023-09-28 06:43:40,884 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    key.converter.schemas.enable = true
2023-09-28 06:43:40,884 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    errors.max.retries = -1
2023-09-28 06:43:40,884 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    value.converter.key.schemas.enable = true
2023-09-28 06:43:40,884 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.password = ********
2023-09-28 06:43:40,884 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    value.converter.schemas.enable = true
2023-09-28 06:43:40,884 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    name = iceberg
2023-09-28 06:43:40,884 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.upsert-keep-deletes = true
2023-09-28 06:43:40,884 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.history = io.debezium.server.iceberg.history.IcebergSchemaHistory
2023-09-28 06:43:40,884 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.catalog-impl = org.apache.iceberg.inmemory.InMemoryCatalog
2023-09-28 06:43:40,885 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    transforms.unwrap.delete.handling.mode = rewrite
2023-09-28 06:43:40,885 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    topic.prefix = test
2023-09-28 06:43:40,885 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.file.filename = 
2023-09-28 06:43:40,885 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.kafka.topic = schemahistory
2023-09-28 06:43:40,885 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.s3.access-key-id = rKONJQf6ILnq3CdtjK9W
2023-09-28 06:43:40,885 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.upsert = true
2023-09-28 06:43:40,885 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    value.converter = org.apache.kafka.connect.json.JsonConverter
2023-09-28 06:43:40,885 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    value.converter.value.schemas.enable = true
2023-09-28 06:43:40,885 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.encrypt = true
2023-09-28 06:43:40,885 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    header.converter.value = json
2023-09-28 06:43:40,885 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    key.converter.value.schemas.enable = true
2023-09-28 06:43:40,886 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    key.converter.value = json
2023-09-28 06:43:40,886 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    key.converter.key.schemas.enable = true
2023-09-28 06:43:40,886 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.catalog-name = mycatalog
2023-09-28 06:43:40,886 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.flush.timeout.ms = 5000
2023-09-28 06:43:40,886 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    errors.retry.delay.max.ms = 10000
2023-09-28 06:43:40,886 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.s3.endpoint = http://minio:9000;
2023-09-28 06:43:40,886 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    value.converter.value = json
2023-09-28 06:43:40,886 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.port = 1433
2023-09-28 06:43:40,886 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.flush.interval.ms = 0
2023-09-28 06:43:40,886 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.hostname = sqlserver
2023-09-28 06:43:40,886 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.table-prefix = debeziumcdc_
2023-09-28 06:43:40,887 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.upsert-keep-deletes = true
2023-09-28 06:43:40,887 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    transforms.unwrap.add.fields = op,table,source.ts_ms,db
2023-09-28 06:43:40,887 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    task.id = 0
2023-09-28 06:43:40,887 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.trustServerCertificate = true
2023-09-28 06:43:40,887 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    value.converter.key = json
2023-09-28 06:43:40,887 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.warehouse = s3a://testtest/iceberg_warehouse
2023-09-28 06:43:40,887 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.s3.path-style-access = true
2023-09-28 06:43:40,887 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    key.converter.key = json
2023-09-28 06:43:40,890 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1) Attempting to start task
2023-09-28 06:43:40,892 INFO  [io.deb.con.CommonConnectorConfig] (pool-6-thread-1) Loading the custom topic naming strategy plugin: io.debezium.schema.SchemaTopicNamingStrategy
2023-09-28 06:43:40,925 ERROR [io.deb.sto.kaf.his.KafkaSchemaHistory] (pool-6-thread-1) The 'schema.history.internal.kafka.bootstrap.servers' value is invalid: A value is required
2023-09-28 06:43:40,926 INFO  [io.deb.emb.EmbeddedEngine] (pool-6-thread-1) Stopping the task and engine
2023-09-28 06:43:40,926 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1) Stopping down connector
2023-09-28 06:43:40,927 ERROR [io.deb.ser.ConnectorLifecycle] (pool-6-thread-1) Connector completed: success = 'false', message = 'Error while trying to run connector class 'io.debezium.connector.sqlserver.SqlServerConnector'', error = 'org.apache.kafka.connect.errors.ConnectException: Error configuring an instance of KafkaSchemaHistory; check the logs for details': org.apache.kafka.connect.errors.ConnectException: Error configuring an instance of KafkaSchemaHistory; check the logs for details
    at io.debezium.storage.kafka.history.KafkaSchemaHistory.configure(KafkaSchemaHistory.java:208)
    at io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig.getSchemaHistory(HistorizedRelationalDatabaseConnectorConfig.java:137)
    at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:49)
    at io.debezium.connector.sqlserver.SqlServerDatabaseSchema.<init>(SqlServerDatabaseSchema.java:35)
    at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:80)
    at io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:244)
    at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:153)
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:852)
    at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:229)
    at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
2023-09-28 06:43:40,950 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2023-09-28 06:43:40,950 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2023-09-28 06:43:40,971 INFO  [io.quarkus] (main) debezium-server-iceberg-dist stopped in 0.042s
ismailsimsek commented 9 months ago

@nelo-0 could you try with following config

debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_test

config name is changed in recent releases https://debezium.io/documentation/reference/2.2/operations/debezium-server.html#debezium-source-database-history-class

ghost commented 9 months ago

@ismailsimsek, sorry for the late reply but I was out of the office for a short holiday.

Thanks for the fix, the sql server config seems to work.

But I think the config with the inmemory catalog does not work. The connector log is not showing any errors at the initial startup. However, I can't see any new files in my S3 bucket... Really strange because the logs are saying new tables are written. After inserting a new row in the sql server table the connector crashes with:

ERROR [io.deb.ser.ConnectorLifecycle] (pool-6-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: No in-memory file found for location: s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Names/data/20231009-1-9efb3b35-e505-462e-a460-3883a14b493e-00002.parquet', error = 'org.apache.iceberg.exceptions.NotFoundException: No in-memory file found for location: s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Names/data/20231009-1-9efb3b35-e505-462e-a460-3883a14b493e-00002.parquet': org.apache.iceberg.exceptions.NotFoundException: No in-memory file found for location: s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Names/data/20231009-1-9efb3b35-e505-462e-a460-3883a14b493e-00002.parquet

Config file:

# Use iceberg sink
debezium.sink.type=iceberg

# Iceberg sink config
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=mycatalog

# S3 config without hadoop catalog. Using InMemoryCatalog catalog And S3FileIO
### using mino as S3
debezium.sink.iceberg.s3.endpoint=http://minio:9000;
debezium.sink.iceberg.s3.path-style-access=true
debezium.sink.iceberg.s3.access-key-id=rKONJQf6ILnq3CdtjK9W
debezium.sink.iceberg.s3.secret-access-key=UrQZwTNgEqDSqCGLrVvzWnk6IqgUU9fOv9jApSde
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.warehouse=s3a://testtest/iceberg_warehouse
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.inmemory.InMemoryCatalog

# sql server source
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=sqlserver
debezium.source.database.port=1433
debezium.source.database.user=sa
debezium.source.database.password=mu6M7Q18nXAhzPfZMGtGVemKb
debezium.source.database.names=POC_Debezium_CDC
debezium.source.topic.prefix=test
debezium.source.database.encrypt=true
debezium.source.database.trustServerCertificate=true
debezium.source.schema.include.list=dbo
debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_test
# debezium.source.schema.history.internal.kafka.bootstrap.servers=localhost:9092
# mandate for sql server source, avoid error when snapshot and schema change
debezium.source.include.schema.changes=false

# enable event schemas - mandate
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# saving debezium state data to destination, iceberg tables
debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table
debezium.source.database.history=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.database.history.iceberg.table-name=debezium_database_history_storage_test

# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
quarkus.log.console.json=false
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

Full log file:

       __       __                 _                 
  ____/ /___   / /_   ___  ____   (_)__  __ ____ ___ 
 / __  // _ \ / __ \ / _ \/_  /  / // / / // __ `__ \
/ /_/ //  __// /_/ //  __/ / /_ / // /_/ // / / / / /
\__,_/ \___//_.___/ \___/ /___//_/ \__,_//_/ /_/ /_/ 

                             Powered by Quarkus 3.3.2
2023-10-09 09:44:57,843 INFO  [io.deb.ser.BaseChangeConsumer] (main) Using 'io.debezium.server.BaseChangeConsumer$$Lambda$209/0x0000000840247840@765df79d' stream name mapper
2023-10-09 09:44:58,005 INFO  [io.deb.ser.ice.IcebergUtil] (main) Using io.quarkus.arc.impl.InstanceImpl
2023-10-09 09:44:58,029 INFO  [io.deb.ser.jso.JsonSerdeConfig] (main) JsonSerdeConfig values: 
    from.field = null
    unknown.properties.ignored = false
2023-10-09 09:44:58,030 INFO  [io.deb.ser.jso.JsonSerdeConfig] (main) JsonSerdeConfig values: 
    from.field = null
    unknown.properties.ignored = false
2023-10-09 09:44:58,030 INFO  [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.iceberg.IcebergChangeConsumer' instantiated
2023-10-09 09:44:58,122 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false
2023-10-09 09:44:58,124 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false
2023-10-09 09:44:58,192 INFO  [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values: 
    access.control.allow.methods = 
    access.control.allow.origin = 
    admin.listeners = null
    auto.include.jmx.reporter = true
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    config.providers = []
    connector.client.config.override.policy = All
    header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    listeners = [http://:8083]
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    offset.flush.interval.ms = 0
    offset.flush.timeout.ms = 5000
    offset.storage.file.filename = 
    offset.storage.partitions = null
    offset.storage.replication.factor = null
    offset.storage.topic = 
    plugin.path = null
    response.http.headers.config = 
    rest.advertised.host.name = null
    rest.advertised.listener = null
    rest.advertised.port = null
    rest.extension.classes = []
    ssl.cipher.suites = null
    ssl.client.auth = none
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    task.shutdown.graceful.timeout.ms = 5000
    topic.creation.enable = true
    topic.tracking.allow.reset = true
    topic.tracking.enable = true
    value.converter = class org.apache.kafka.connect.json.JsonConverter
2023-10-09 09:44:58,192 WARN  [org.apa.kaf.con.run.WorkerConfig] (main) Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.
2023-10-09 09:44:58,196 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true
2023-10-09 09:44:58,197 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true
2023-10-09 09:44:58,198 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = header
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true
2023-10-09 09:44:58,202 INFO  [io.deb.ser.DebeziumServer] (main) Engine executor started
2023-10-09 09:44:58,759 INFO  [io.quarkus] (main) debezium-server-iceberg-dist 0.3.0-SNAPSHOT on JVM (powered by Quarkus 3.3.2) started in 3.010s. Listening on: http://0.0.0.0:8080
2023-10-09 09:44:58,759 INFO  [io.quarkus] (main) Profile prod activated. 
2023-10-09 09:44:58,759 INFO  [io.quarkus] (main) Installed features: [cdi, micrometer, resteasy-jackson, smallrye-context-propagation, smallrye-health, vertx]
2023-10-09 09:45:02,375 INFO  [io.deb.con.sql.SqlServerConnector] (pool-6-thread-1) Checking if user has access to CDC table
2023-10-09 09:45:08,050 INFO  [io.deb.jdb.JdbcConnection] (pool-8-thread-1) Connection gracefully closed
2023-10-09 09:45:08,125 INFO  [io.deb.ser.ice.off.IcebergOffsetBackingStore$IcebergOffsetBackingStoreConfig] (pool-6-thread-1) IcebergOffsetBackingStoreConfig values: 
2023-10-09 09:45:08,125 WARN  [org.apa.kaf.con.run.WorkerConfig] (pool-6-thread-1) Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.
2023-10-09 09:45:08,159 INFO  [io.deb.ser.ice.off.IcebergOffsetBackingStore] (pool-6-thread-1) Starting IcebergOffsetBackingStore table:default.debezium_offset_storage_custom_table
2023-10-09 09:45:08,163 WARN  [io.deb.ser.ice.IcebergUtil] (pool-6-thread-1) Created namespace:'default'
2023-10-09 09:45:08,166 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table properties set at catalog level through catalog properties: {}
2023-10-09 09:45:08,167 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table properties enforced at catalog level through catalog properties: {}
2023-10-09 09:45:08,473 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Successfully committed to table default.debezium_offset_storage_custom_table in 269 ms
2023-10-09 09:45:08,482 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/default/debezium_offset_storage_custom_table/metadata/00000-107b3274-04ab-44c9-be1d-1c9d34da2835.metadata.json
2023-10-09 09:45:08,514 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table loaded by catalog: mycatalog.default.debezium_offset_storage_custom_table
2023-10-09 09:45:08,514 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/default/debezium_offset_storage_custom_table/metadata/00000-107b3274-04ab-44c9-be1d-1c9d34da2835.metadata.json
2023-10-09 09:45:08,537 INFO  [org.apa.ice.SnapshotScan] (pool-6-thread-1) Scanning empty table mycatalog.default.debezium_offset_storage_custom_table
2023-10-09 09:45:08,614 INFO  [io.deb.jdb.JdbcConnection] (pool-11-thread-1) Connection gracefully closed
2023-10-09 09:45:08,628 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1) Starting SqlServerConnectorTask with configuration:
2023-10-09 09:45:08,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    connector.class = io.debezium.connector.sqlserver.SqlServerConnector
2023-10-09 09:45:08,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.table-prefix = debeziumcdc_
2023-10-09 09:45:08,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    transforms = unwrap
2023-10-09 09:45:08,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.include.list = dbo
2023-10-09 09:45:08,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.write.format.default = parquet
2023-10-09 09:45:08,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.upsert = true
2023-10-09 09:45:08,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.catalog-name = mycatalog
2023-10-09 09:45:08,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.s3.secret-access-key = UrQZwTNgEqDSqCGLrVvzWnk6IqgUU9fOv9jApSde
2023-10-09 09:45:08,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    include.schema.changes = false
2023-10-09 09:45:08,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.catalog-impl = org.apache.iceberg.inmemory.InMemoryCatalog
2023-10-09 09:45:08,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.s3.secret-access-key = UrQZwTNgEqDSqCGLrVvzWnk6IqgUU9fOv9jApSde
2023-10-09 09:45:08,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    transforms.unwrap.drop.tombstones = true
2023-10-09 09:45:08,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.warehouse = s3a://testtest/iceberg_warehouse
2023-10-09 09:45:08,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.write.format.default = parquet
2023-10-09 09:45:08,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.s3.access-key-id = rKONJQf6ILnq3CdtjK9W
2023-10-09 09:45:08,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.io-impl = org.apache.iceberg.aws.s3.S3FileIO
2023-10-09 09:45:08,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState
2023-10-09 09:45:08,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    errors.retry.delay.initial.ms = 300
2023-10-09 09:45:08,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    key.converter = org.apache.kafka.connect.json.JsonConverter
2023-10-09 09:45:08,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.io-impl = org.apache.iceberg.aws.s3.S3FileIO
2023-10-09 09:45:08,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.history.iceberg.table-name = debezium_database_history_storage_test
2023-10-09 09:45:08,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.table-name = debezium_offset_storage_custom_table
2023-10-09 09:45:08,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.user = sa
2023-10-09 09:45:08,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.names = POC_Debezium_CDC
2023-10-09 09:45:08,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    header.converter.value.schemas.enable = true
2023-10-09 09:45:08,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage = io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
2023-10-09 09:45:08,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    header.converter.key = json
2023-10-09 09:45:08,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    header.converter.key.schemas.enable = true
2023-10-09 09:45:08,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.s3.path-style-access = true
2023-10-09 09:45:08,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.s3.endpoint = http://minio:9000;
2023-10-09 09:45:08,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    key.converter.schemas.enable = true
2023-10-09 09:45:08,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    errors.max.retries = -1
2023-10-09 09:45:08,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    value.converter.key.schemas.enable = true
2023-10-09 09:45:08,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.password = ********
2023-10-09 09:45:08,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    value.converter.schemas.enable = true
2023-10-09 09:45:08,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    name = iceberg
2023-10-09 09:45:08,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.upsert-keep-deletes = true
2023-10-09 09:45:08,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.history = io.debezium.server.iceberg.history.IcebergSchemaHistory
2023-10-09 09:45:08,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.iceberg.catalog-impl = org.apache.iceberg.inmemory.InMemoryCatalog
2023-10-09 09:45:08,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    transforms.unwrap.delete.handling.mode = rewrite
2023-10-09 09:45:08,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    topic.prefix = test
2023-10-09 09:45:08,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.storage.file.filename = 
2023-10-09 09:45:08,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.s3.access-key-id = rKONJQf6ILnq3CdtjK9W
2023-10-09 09:45:08,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.upsert = true
2023-10-09 09:45:08,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    value.converter = org.apache.kafka.connect.json.JsonConverter
2023-10-09 09:45:08,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    value.converter.value.schemas.enable = true
2023-10-09 09:45:08,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.encrypt = true
2023-10-09 09:45:08,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    header.converter.value = json
2023-10-09 09:45:08,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    key.converter.value.schemas.enable = true
2023-10-09 09:45:08,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    key.converter.value = json
2023-10-09 09:45:08,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    key.converter.key.schemas.enable = true
2023-10-09 09:45:08,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.catalog-name = mycatalog
2023-10-09 09:45:08,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.flush.timeout.ms = 5000
2023-10-09 09:45:08,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    errors.retry.delay.max.ms = 10000
2023-10-09 09:45:08,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.s3.endpoint = http://minio:9000;
2023-10-09 09:45:08,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    value.converter.value = json
2023-10-09 09:45:08,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.port = 1433
2023-10-09 09:45:08,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    offset.flush.interval.ms = 0
2023-10-09 09:45:08,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal = io.debezium.server.iceberg.history.IcebergSchemaHistory
2023-10-09 09:45:08,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.hostname = sqlserver
2023-10-09 09:45:08,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.table-name = debezium_database_history_storage_test
2023-10-09 09:45:08,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.table-prefix = debeziumcdc_
2023-10-09 09:45:08,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.upsert-keep-deletes = true
2023-10-09 09:45:08,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    transforms.unwrap.add.fields = op,table,source.ts_ms,db
2023-10-09 09:45:08,636 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    task.id = 0
2023-10-09 09:45:08,636 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    database.trustServerCertificate = true
2023-10-09 09:45:08,636 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    value.converter.key = json
2023-10-09 09:45:08,636 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.warehouse = s3a://testtest/iceberg_warehouse
2023-10-09 09:45:08,636 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    schema.history.internal.iceberg.s3.path-style-access = true
2023-10-09 09:45:08,636 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1)    key.converter.key = json
2023-10-09 09:45:08,638 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1) Attempting to start task
2023-10-09 09:45:08,640 INFO  [io.deb.con.CommonConnectorConfig] (pool-6-thread-1) Loading the custom topic naming strategy plugin: io.debezium.schema.SchemaTopicNamingStrategy
2023-10-09 09:45:08,712 INFO  [io.deb.ser.ice.his.IcebergSchemaHistory] (pool-6-thread-1) Starting IcebergSchemaHistory storage table:mycatalog.debezium_database_history_storage_test
2023-10-09 09:45:08,713 WARN  [io.deb.ser.ice.IcebergUtil] (pool-6-thread-1) Created namespace:'mycatalog'
2023-10-09 09:45:08,713 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table properties set at catalog level through catalog properties: {}
2023-10-09 09:45:08,713 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table properties enforced at catalog level through catalog properties: {}
2023-10-09 09:45:08,714 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Successfully committed to table mycatalog.debezium_database_history_storage_test in 0 ms
2023-10-09 09:45:08,714 WARN  [io.deb.ser.ice.his.IcebergSchemaHistory] (pool-6-thread-1) Created database history storage table mycatalog.debezium_database_history_storage_test to store history
2023-10-09 09:45:08,714 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/mycatalog/debezium_database_history_storage_test/metadata/00000-cbb792a9-a298-45db-8b8e-4617b964037c.metadata.json
2023-10-09 09:45:08,715 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table loaded by catalog: mycatalog.mycatalog.debezium_database_history_storage_test
2023-10-09 09:45:08,715 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/mycatalog/debezium_database_history_storage_test/metadata/00000-cbb792a9-a298-45db-8b8e-4617b964037c.metadata.json
2023-10-09 09:45:08,716 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table loaded by catalog: mycatalog.mycatalog.debezium_database_history_storage_test
2023-10-09 09:45:08,744 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1) No previous offsets found
2023-10-09 09:45:08,768 INFO  [io.deb.uti.Threads] (pool-6-thread-1) Requested thread factory for connector SqlServerConnector, id = test named = change-event-source-coordinator
2023-10-09 09:45:08,786 INFO  [io.deb.uti.Threads] (pool-6-thread-1) Creating thread debezium-sqlserverconnector-test-change-event-source-coordinator
2023-10-09 09:45:08,786 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1) Successfully started task
2023-10-09 09:45:08,793 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-test-change-event-source-coordinator) Metrics registered
2023-10-09 09:45:08,795 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-test-change-event-source-coordinator) Context created
2023-10-09 09:45:08,803 INFO  [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) No previous offset has been found
2023-10-09 09:45:08,803 INFO  [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) According to the connector configuration both schema and data will be snapshotted
2023-10-09 09:45:08,805 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Snapshot step 1 - Preparing
2023-10-09 09:45:08,820 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Snapshot step 2 - Determining captured tables
2023-10-09 09:45:08,890 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Adding table POC_Debezium_CDC.dbo.Names to the list of capture schema tables
2023-10-09 09:45:08,890 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Adding table POC_Debezium_CDC.dbo.Function_ to the list of capture schema tables
2023-10-09 09:45:08,892 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Snapshot step 3 - Locking captured tables [POC_Debezium_CDC.dbo.Function_, POC_Debezium_CDC.dbo.Names]
2023-10-09 09:45:08,892 INFO  [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Setting locking timeout to 10 s
2023-10-09 09:45:08,898 INFO  [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Executing schema locking
2023-10-09 09:45:08,899 INFO  [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Locking table POC_Debezium_CDC.dbo.Function_
2023-10-09 09:45:08,900 INFO  [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Locking table POC_Debezium_CDC.dbo.Names
2023-10-09 09:45:08,901 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Snapshot step 4 - Determining snapshot offset
2023-10-09 09:45:08,913 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Snapshot step 5 - Reading structure of captured tables
2023-10-09 09:45:08,920 INFO  [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Reading structure of schema 'POC_Debezium_CDC'
2023-10-09 09:45:09,080 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Snapshot step 5.a - Creating connection pool
2023-10-09 09:45:09,081 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Created connection pool with 1 threads
2023-10-09 09:45:09,081 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Snapshot step 6 - Persisting schema history
2023-10-09 09:45:09,081 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Capturing structure of table POC_Debezium_CDC.dbo.Function_
2023-10-09 09:45:09,343 WARN  [org.apa.had.uti.NativeCodeLoader] (debezium-sqlserverconnector-test-change-event-source-coordinator) Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2023-10-09 09:45:10,168 INFO  [org.apa.ice.SnapshotProducer] (debezium-sqlserverconnector-test-change-event-source-coordinator) Committed snapshot 6852672812362054277 (StreamingDelete)
2023-10-09 09:45:10,227 INFO  [org.apa.ice.met.LoggingMetricsReporter] (debezium-sqlserverconnector-test-change-event-source-coordinator) Received metrics report: CommitReport{tableName=mycatalog.mycatalog.debezium_database_history_storage_test, snapshotId=6852672812362054277, sequenceNumber=0, operation=delete, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.125473566S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=null, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=0}, addedDeleteFiles=null, addedEqualityDeleteFiles=null, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=0}, addedRecords=null, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=0}, addedFilesSizeInBytes=null, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=0}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=null, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=0}}, metadata={iceberg-version=Apache Iceberg 1.3.1 (commit 62c34711c3f22e520db65c51255512f6cfe622c4)}}
2023-10-09 09:45:10,309 INFO  [org.apa.ice.SnapshotProducer] (debezium-sqlserverconnector-test-change-event-source-coordinator) Committed snapshot 1430009288422402464 (MergeAppend)
2023-10-09 09:45:10,342 INFO  [org.apa.ice.met.LoggingMetricsReporter] (debezium-sqlserverconnector-test-change-event-source-coordinator) Received metrics report: CommitReport{tableName=mycatalog.mycatalog.debezium_database_history_storage_test, snapshotId=1430009288422402464, sequenceNumber=0, operation=append, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.112625779S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=1}, addedDeleteFiles=null, addedEqualityDeleteFiles=null, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=0}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=1}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=5416}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=5416}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=null, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=0}}, metadata={iceberg-version=Apache Iceberg 1.3.1 (commit 62c34711c3f22e520db65c51255512f6cfe622c4)}}
2023-10-09 09:45:10,345 INFO  [org.apa.ice.BaseMetastoreTableOperations] (debezium-sqlserverconnector-test-change-event-source-coordinator) Successfully committed to table mycatalog.debezium_database_history_storage_test in 2 ms
2023-10-09 09:45:10,346 INFO  [org.apa.ice.BaseMetastoreTableOperations] (debezium-sqlserverconnector-test-change-event-source-coordinator) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/mycatalog/debezium_database_history_storage_test/metadata/00001-76f8d12e-b5a2-4561-8025-20ca447507ea.metadata.json
2023-10-09 09:45:10,355 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Capturing structure of table POC_Debezium_CDC.dbo.Names
2023-10-09 09:45:10,460 INFO  [org.apa.ice.SnapshotProducer] (debezium-sqlserverconnector-test-change-event-source-coordinator) Committed snapshot 1437594426051498050 (StreamingDelete)
2023-10-09 09:45:10,463 INFO  [org.apa.ice.met.LoggingMetricsReporter] (debezium-sqlserverconnector-test-change-event-source-coordinator) Received metrics report: CommitReport{tableName=mycatalog.mycatalog.debezium_database_history_storage_test, snapshotId=1437594426051498050, sequenceNumber=0, operation=delete, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.087638354S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=null, removedDataFiles=CounterResult{unit=COUNT, value=1}, totalDataFiles=CounterResult{unit=COUNT, value=0}, addedDeleteFiles=null, addedEqualityDeleteFiles=null, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=0}, addedRecords=null, removedRecords=CounterResult{unit=COUNT, value=1}, totalRecords=CounterResult{unit=COUNT, value=0}, addedFilesSizeInBytes=null, removedFilesSizeInBytes=CounterResult{unit=BYTES, value=5416}, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=0}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=null, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=0}}, metadata={iceberg-version=Apache Iceberg 1.3.1 (commit 62c34711c3f22e520db65c51255512f6cfe622c4)}}
2023-10-09 09:45:10,500 INFO  [org.apa.ice.SnapshotProducer] (debezium-sqlserverconnector-test-change-event-source-coordinator) Committed snapshot 8718837269412885452 (MergeAppend)
2023-10-09 09:45:10,503 INFO  [org.apa.ice.met.LoggingMetricsReporter] (debezium-sqlserverconnector-test-change-event-source-coordinator) Received metrics report: CommitReport{tableName=mycatalog.mycatalog.debezium_database_history_storage_test, snapshotId=8718837269412885452, sequenceNumber=0, operation=append, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.039889257S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=1}, addedDeleteFiles=null, addedEqualityDeleteFiles=null, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=0}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=1}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=7295}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=7295}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=null, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=0}}, metadata={iceberg-version=Apache Iceberg 1.3.1 (commit 62c34711c3f22e520db65c51255512f6cfe622c4)}}
2023-10-09 09:45:10,504 INFO  [org.apa.ice.BaseMetastoreTableOperations] (debezium-sqlserverconnector-test-change-event-source-coordinator) Successfully committed to table mycatalog.debezium_database_history_storage_test in 1 ms
2023-10-09 09:45:10,504 INFO  [org.apa.ice.BaseMetastoreTableOperations] (debezium-sqlserverconnector-test-change-event-source-coordinator) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/mycatalog/debezium_database_history_storage_test/metadata/00002-307145d5-a936-4931-af5f-d87f6b6455aa.metadata.json
2023-10-09 09:45:10,509 INFO  [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Schema locks released.
2023-10-09 09:45:10,510 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Snapshot step 7 - Snapshotting data
2023-10-09 09:45:10,511 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Creating snapshot worker pool with 1 worker thread(s)
2023-10-09 09:45:10,514 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) For table 'POC_Debezium_CDC.dbo.Function_' using select statement: 'SELECT [ID], [Function] FROM [POC_Debezium_CDC].[dbo].[Function_]'
2023-10-09 09:45:10,514 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) For table 'POC_Debezium_CDC.dbo.Names' using select statement: 'SELECT [ID], [Surname], [Name], [AGE] FROM [POC_Debezium_CDC].[dbo].[Names]'
2023-10-09 09:45:10,516 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (pool-12-thread-1) Exporting data from table 'POC_Debezium_CDC.dbo.Function_' (1 of 2 tables)
2023-10-09 09:45:10,536 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (pool-12-thread-1)    Finished exporting 3 records for table 'POC_Debezium_CDC.dbo.Function_' (1 of 2 tables); total duration '00:00:00.02'
2023-10-09 09:45:10,537 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (pool-12-thread-1) Exporting data from table 'POC_Debezium_CDC.dbo.Names' (2 of 2 tables)
2023-10-09 09:45:10,541 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (pool-12-thread-1)    Finished exporting 4 records for table 'POC_Debezium_CDC.dbo.Names' (2 of 2 tables); total duration '00:00:00.004'
2023-10-09 09:45:10,543 INFO  [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Snapshot - Final stage
2023-10-09 09:45:10,543 INFO  [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Snapshot completed
2023-10-09 09:45:10,544 INFO  [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Removing locking timeout
2023-10-09 09:45:10,558 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-test-change-event-source-coordinator) Snapshot ended with SnapshotResult [status=COMPLETED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=test, changeLsn=NULL, commitLsn=00000035:00002498:0004, eventSerialNo=null, snapshot=FALSE, sourceTime=2023-10-09T09:45:10.533Z], snapshotCompleted=true, eventSerialNo=1]]
2023-10-09 09:45:10,562 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-test-change-event-source-coordinator) Connected metrics set to 'true'
2023-10-09 09:45:10,564 INFO  [io.deb.con.sql.SqlServerChangeEventSourceCoordinator] (debezium-sqlserverconnector-test-change-event-source-coordinator) Starting streaming
2023-10-09 09:45:10,566 INFO  [io.deb.con.sql.SqlServerStreamingChangeEventSource] (debezium-sqlserverconnector-test-change-event-source-coordinator) Last position recorded in offsets is 00000035:00002498:0004(NULL)[1]
2023-10-09 09:45:10,826 WARN  [io.deb.ser.ice.IcebergUtil] (pool-6-thread-1) Table not found: default.debeziumcdc_test_POC_Debezium_CDC_dbo_Names
2023-10-09 09:45:10,828 WARN  [io.deb.ser.ice.IcebergUtil] (pool-6-thread-1) Creating table:'default.debeziumcdc_test_POC_Debezium_CDC_dbo_Names'
schema:table {
  1: ID: required int (id)
  2: Surname: optional string
  3: Name: optional string
  4: AGE: optional int
  5: __op: optional string
  6: __table: optional string
  7: __source_ts_ms: optional timestamptz
  8: __db: optional string
  9: __deleted: optional string
}
rowIdentifier:[ID]
2023-10-09 09:45:10,828 WARN  [io.deb.ser.ice.IcebergUtil] (pool-6-thread-1) Created namespace:'default'
2023-10-09 09:45:10,828 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table properties set at catalog level through catalog properties: {}
2023-10-09 09:45:10,835 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table properties enforced at catalog level through catalog properties: {}
2023-10-09 09:45:10,838 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Successfully committed to table default.debeziumcdc_test_POC_Debezium_CDC_dbo_Names in 1 ms
2023-10-09 09:45:10,838 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Names/metadata/00000-097b4ae9-6f71-4d42-a27f-59e6cc32aaf5.metadata.json
2023-10-09 09:45:10,956 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Successfully committed to table default.debeziumcdc_test_POC_Debezium_CDC_dbo_Names in 0 ms
2023-10-09 09:45:10,956 INFO  [org.apa.ice.SnapshotProducer] (pool-6-thread-1) Committed snapshot 3245967521201954289 (BaseRowDelta)
2023-10-09 09:45:10,957 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Names/metadata/00001-4bdd325e-28a0-461c-8353-6b296048c734.metadata.json
2023-10-09 09:45:10,959 INFO  [org.apa.ice.met.LoggingMetricsReporter] (pool-6-thread-1) Received metrics report: CommitReport{tableName=mycatalog.default.debeziumcdc_test_POC_Debezium_CDC_dbo_Names, snapshotId=3245967521201954289, sequenceNumber=1, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.047394997S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=1}, addedDeleteFiles=CounterResult{unit=COUNT, value=1}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=1}, addedRecords=CounterResult{unit=COUNT, value=4}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=4}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=5812}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=5812}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=4}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=4}}, metadata={iceberg-version=Apache Iceberg 1.3.1 (commit 62c34711c3f22e520db65c51255512f6cfe622c4)}}
2023-10-09 09:45:10,959 INFO  [io.deb.ser.ice.tab.IcebergTableOperator] (pool-6-thread-1) Committed 4 events to table! s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Names
2023-10-09 09:45:10,960 WARN  [io.deb.ser.ice.IcebergUtil] (pool-6-thread-1) Table not found: default.debeziumcdc_test_POC_Debezium_CDC_dbo_Function_
2023-10-09 09:45:10,960 WARN  [io.deb.ser.ice.IcebergUtil] (pool-6-thread-1) Creating table:'default.debeziumcdc_test_POC_Debezium_CDC_dbo_Function_'
schema:table {
  1: ID: required int (id)
  2: Function: optional string
  3: __op: optional string
  4: __table: optional string
  5: __source_ts_ms: optional timestamptz
  6: __db: optional string
  7: __deleted: optional string
}
rowIdentifier:[ID]
2023-10-09 09:45:10,960 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table properties set at catalog level through catalog properties: {}
2023-10-09 09:45:10,961 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table properties enforced at catalog level through catalog properties: {}
2023-10-09 09:45:10,961 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Successfully committed to table default.debeziumcdc_test_POC_Debezium_CDC_dbo_Function_ in 0 ms
2023-10-09 09:45:10,961 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Function_/metadata/00000-183bd8ae-7044-43ec-a2bb-fde8217d7592.metadata.json
2023-10-09 09:45:11,029 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Successfully committed to table default.debeziumcdc_test_POC_Debezium_CDC_dbo_Function_ in 0 ms
2023-10-09 09:45:11,029 INFO  [org.apa.ice.SnapshotProducer] (pool-6-thread-1) Committed snapshot 1975199247859405134 (BaseRowDelta)
2023-10-09 09:45:11,029 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Function_/metadata/00001-71149a0f-1db5-42c4-a1d1-4ae5c4ddfa48.metadata.json
2023-10-09 09:45:11,032 INFO  [org.apa.ice.met.LoggingMetricsReporter] (pool-6-thread-1) Received metrics report: CommitReport{tableName=mycatalog.default.debeziumcdc_test_POC_Debezium_CDC_dbo_Function_, snapshotId=1975199247859405134, sequenceNumber=1, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.04237405S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=1}, addedDeleteFiles=CounterResult{unit=COUNT, value=1}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=1}, addedRecords=CounterResult{unit=COUNT, value=3}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=3}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=4844}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=4844}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=CounterResult{unit=COUNT, value=3}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=3}}, metadata={iceberg-version=Apache Iceberg 1.3.1 (commit 62c34711c3f22e520db65c51255512f6cfe622c4)}}
2023-10-09 09:45:11,032 INFO  [io.deb.ser.ice.tab.IcebergTableOperator] (pool-6-thread-1) Committed 3 events to table! s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Function_
2023-10-09 09:45:11,085 INFO  [org.apa.ice.SnapshotProducer] (IcebergOffsetBackingStore-1) Committed snapshot 6731691641140470754 (StreamingDelete)
2023-10-09 09:45:11,086 INFO  [org.apa.ice.met.LoggingMetricsReporter] (IcebergOffsetBackingStore-1) Received metrics report: CommitReport{tableName=mycatalog.default.debezium_offset_storage_custom_table, snapshotId=6731691641140470754, sequenceNumber=0, operation=delete, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.002644554S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=null, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=0}, addedDeleteFiles=null, addedEqualityDeleteFiles=null, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=0}, addedRecords=null, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=0}, addedFilesSizeInBytes=null, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=0}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=null, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=0}}, metadata={iceberg-version=Apache Iceberg 1.3.1 (commit 62c34711c3f22e520db65c51255512f6cfe622c4)}}
2023-10-09 09:45:11,111 INFO  [org.apa.ice.SnapshotProducer] (IcebergOffsetBackingStore-1) Committed snapshot 5663587160489781067 (MergeAppend)
2023-10-09 09:45:11,113 INFO  [org.apa.ice.met.LoggingMetricsReporter] (IcebergOffsetBackingStore-1) Received metrics report: CommitReport{tableName=mycatalog.default.debezium_offset_storage_custom_table, snapshotId=5663587160489781067, sequenceNumber=0, operation=append, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.026497289S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=1}, addedDeleteFiles=null, addedEqualityDeleteFiles=null, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=0}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=1}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=2125}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=2125}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=null, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=0}}, metadata={iceberg-version=Apache Iceberg 1.3.1 (commit 62c34711c3f22e520db65c51255512f6cfe622c4)}}
2023-10-09 09:45:11,114 INFO  [org.apa.ice.BaseMetastoreTableOperations] (IcebergOffsetBackingStore-1) Successfully committed to table default.debezium_offset_storage_custom_table in 0 ms
2023-10-09 09:45:11,114 INFO  [org.apa.ice.BaseMetastoreTableOperations] (IcebergOffsetBackingStore-1) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/default/debezium_offset_storage_custom_table/metadata/00001-4cb70f5a-6a92-4076-89c8-05d571e5abf3.metadata.json
2023-10-09 09:51:32,379 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1) 8 records sent during previous 00:06:23.751, last recorded offset of {server=test, database=POC_Debezium_CDC} partition is {transaction_id=null, event_serial_no=1, commit_lsn=00000035:00002818:0004, change_lsn=00000035:00002818:0003}
2023-10-09 09:51:32,381 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-6-thread-1) Refreshing table metadata from new version: s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Names/metadata/00001-4bdd325e-28a0-461c-8353-6b296048c734.metadata.json
2023-10-09 09:51:32,382 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-6-thread-1) Table loaded by catalog: mycatalog.default.debeziumcdc_test_POC_Debezium_CDC_dbo_Names
2023-10-09 09:51:32,404 INFO  [io.deb.emb.EmbeddedEngine] (pool-6-thread-1) Stopping the task and engine
2023-10-09 09:51:32,405 INFO  [io.deb.con.com.BaseSourceTask] (pool-6-thread-1) Stopping down connector
2023-10-09 09:51:32,566 INFO  [io.deb.con.sql.SqlServerChangeEventSourceCoordinator] (debezium-sqlserverconnector-test-change-event-source-coordinator) Finished streaming
2023-10-09 09:51:32,567 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-test-change-event-source-coordinator) Connected metrics set to 'false'
2023-10-09 09:51:32,574 INFO  [io.deb.jdb.JdbcConnection] (pool-13-thread-1) Connection gracefully closed
2023-10-09 09:51:32,576 INFO  [io.deb.jdb.JdbcConnection] (pool-14-thread-1) Connection gracefully closed
2023-10-09 09:51:32,579 ERROR [io.deb.ser.ConnectorLifecycle] (pool-6-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: No in-memory file found for location: s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Names/data/20231009-1-9efb3b35-e505-462e-a460-3883a14b493e-00002.parquet', error = 'org.apache.iceberg.exceptions.NotFoundException: No in-memory file found for location: s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Names/data/20231009-1-9efb3b35-e505-462e-a460-3883a14b493e-00002.parquet': org.apache.iceberg.exceptions.NotFoundException: No in-memory file found for location: s3a://testtest/iceberg_warehouse/default/debeziumcdc_test_POC_Debezium_CDC_dbo_Names/data/20231009-1-9efb3b35-e505-462e-a460-3883a14b493e-00002.parquet
    at org.apache.iceberg.inmemory.InMemoryFileIO.deleteFile(InMemoryFileIO.java:63)
    at org.apache.iceberg.io.FileIO.deleteFile(FileIO.java:58)
    at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.closeCurrent(BaseTaskWriter.java:318)
    at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.close(BaseTaskWriter.java:341)
    at org.apache.iceberg.io.BaseTaskWriter$BaseEqualityDeltaWriter.close(BaseTaskWriter.java:205)
    at io.debezium.server.iceberg.tableoperator.BaseDeltaTaskWriter$RowDataDeltaWriter.close(BaseDeltaTaskWriter.java:66)
    at io.debezium.server.iceberg.tableoperator.UnpartitionedDeltaWriter.close(UnpartitionedDeltaWriter.java:38)
    at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.addToTablePerSchema(IcebergTableOperator.java:176)
    at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:157)
    at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:167)
    at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:101)
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:912)
    at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:229)
    at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
2023-10-09 09:51:32,616 INFO  [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2023-10-09 09:51:32,617 INFO  [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2023-10-09 09:51:32,641 INFO  [io.quarkus] (main) debezium-server-iceberg-dist stopped in 0.059s
ismailsimsek commented 9 months ago

@nelo-0 could you try it with debezium.sink.iceberg.s3.endpoint=http://minio:9000 removing ; character?

ghost commented 9 months ago

@ismailsimsek That was a typo indeed. Unfortunately, after fixing the typo the error was still there.

Finally got it working with spark-iceberg en the icerberg rest api catalog. Thanks to https://tabular.io/blog/docker-spark-and-iceberg/

Final setup:

sqlserver = docker-compose.yml

version: "3.8"

services:
  sqlserver:
    container_name: sqlserver
    image: mcr.microsoft.com/mssql/server:2019-latest
    user: root
    networks:
      - my-network
    ports:
      - "1433:1433"
    volumes:
      - ./data/sqlserver/data:/var/opt/mssql/data
      - ./data/sqlserver/log:/var/opt/mssql/log
      - ./data/sqlserver/secrets:/var/opt/mssql/secrets
    env_file:
      - .env
    environment:
      - "ACCEPT_EULA=Y"
      - "MSSQL_AGENT_ENABLED=True"
      - MSSQL_SA_PASSWORD=${MSSQL_SA_PASSWORD}
networks:
  my-network:
    name: poc_debezium_net
    driver: bridge

main-setup = docker-compose.yml

version: "3.8"

services:
  spark-iceberg:
    image: tabulario/spark-iceberg
    container_name: spark-iceberg
    build: spark/
    networks:
      - my-proxy-net
    depends_on:
      - rest
      - minio
      - mc
    volumes:
      - ./data/spark-iceberg/warehouse:/home/iceberg/warehouse
      - ./notebooks:/home/iceberg/notebooks/notebooks
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    ports:
      - 8888:8888
      - 8080:8080
      - 10000:10000
      - 10001:10001
  rest:
    image: tabulario/iceberg-rest
    container_name: iceberg-rest
    networks:
      - my-proxy-net
    ports:
      - 8181:8181
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
      - CATALOG_WAREHOUSE=s3://warehouse/
      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
      - CATALOG_S3_ENDPOINT=http://minio:9000
  minio:
    image: minio/minio
    container_name: minio
    environment:
      - MINIO_ROOT_USER=admin
      - MINIO_ROOT_PASSWORD=password
      - MINIO_DOMAIN=minio
    networks:
      my-proxy-net:
        aliases:
          - warehouse.minio
    volumes:
      - ./data/minio:/data
    ports:
      - 9001:9001
      - 9000:9000
    command: ["server", "/data", "--console-address", ":9001"]
  debezium-iceberg:
    container_name: debezium
    networks:
      - my-proxy-net
    depends_on:
      - rest
      - minio
      - mc
    image: ghcr.io/memiiso/debezium-server-iceberg:latest
    user: root
    volumes:
      - ./conf/debezium-iceberg:/app/conf
    environment:
      - AWS_REGION=us-east-1
  mc:
    depends_on:
      - minio
    image: minio/mc
    container_name: mc
    networks:
      - my-proxy-net
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    entrypoint: >
      /bin/sh -c "
      until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
      /usr/bin/mc rm -r --force minio/warehouse;
      /usr/bin/mc mb minio/warehouse;
      /usr/bin/mc policy set public minio/warehouse;
      tail -f /dev/null
      "
networks:
  my-proxy-net:
    external:
      name: poc_debezium_net

application.properties

# Use iceberg sink
debezium.sink.type=iceberg

# Iceberg sink config
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.upsert=false
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.table-namespace=debeziumdata
debezium.sink.iceberg.catalog-name=iceberg

#debezium.sink.iceberg.catalog-impl=org.apache.iceberg.inmemory.InMemoryCatalog

# REST API CATALOG
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.rest.RESTCatalog
debezium.sink.iceberg.uri=http://rest:8181

debezium.sink.iceberg.warehouse=s3a://warehouse/
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO

debezium.sink.iceberg.s3.endpoint=http://minio:9000
debezium.sink.iceberg.s3.path-style-access=true
debezium.sink.iceberg.s3.access-key-id=admin
debezium.sink.iceberg.s3.secret-access-key=password
#debezium.sink.iceberg.s3.region=us-east-1

# sql server source
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=sqlserver
debezium.source.database.port=1433
debezium.source.database.user=sa
debezium.source.database.password=mu6M7Q18nXAhzPfZMGtGVemKb
debezium.source.database.names=POC_Debezium_CDC
debezium.source.topic.prefix=test
debezium.source.database.encrypt=true
debezium.source.database.trustServerCertificate=true
debezium.source.schema.include.list=dbo
debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_test
# debezium.source.schema.history.internal.kafka.bootstrap.servers=localhost:9092
# mandate for sql server source, avoid error when snapshot and schema change
debezium.source.include.schema.changes=false
# saving debezium state data to destination, iceberg tables
debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table
debezium.source.database.history=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.database.history.iceberg.table-name=debezium_database_history_storage_test

# enable event schemas - mandate
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
quarkus.log.console.json=false
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

This is really promising. Thanks for all the help and hard work so far. I will continue my journey to try setting this up with the Azure Databricks hive metastore en writing the iceberg files to ADLS.

ismailsimsek commented 9 months ago

Fantastic, please feel free to share your experience, A documentation page to explain your setup on azure definitely will be helpful for other users. recently some users were asking about azure storage setup and hive meta store catalog setup