memiiso / debezium-server-iceberg

Replicates any database (CDC events) to Apache Iceberg (To Cloud Storage)
Apache License 2.0
174 stars 35 forks source link

Only DDL events are captured for mysql but no DML one #378

Open DipalPrajapati opened 3 weeks ago

DipalPrajapati commented 3 weeks ago

application.properties

# Use iceberg sink
debezium.sink.type=iceberg

# Iceberg sink config
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=iceberg

# S3 config using JdbcCatalog catalog And S3FileIO
debezium.sink.iceberg.type=jdbc
debezium.sink.iceberg.catalog-name=iceberg
debezium.sink.iceberg.table-namespace=company
debezium.sink.iceberg.warehouse=s3://iceberg-cdc-table/warehouse
debezium.sink.iceberg.uri=jdbc:mysql://company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306/company
debezium.sink.iceberg.jdbc.user=root
debezium.sink.iceberg.jdbc.password=mysqlserver

# Use S3FileIO
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.s3.endpoint=https://iceberg-cdc-table.s3.amazonaws.com/warehouse
debezium.sink.iceberg.s3.path-style-access=true
debezium.sink.iceberg.s3.access-key-id=xxxxxxxxxx
debezium.sink.iceberg.s3.secret-access-key=xxxxxxxxxxx
debezium.sink.iceberg.s3.region=us-east-1
AWS_REGION=us-east-1

# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=mysqlserver
debezium.source.database.dbname=company
debezium.source.database.server.name=mysql80
debezium.source.database.server.id=1234
debezium.source.database.include.list=company
debezium.source.schema.include.list=company.orders
debezium.source.table.include.list=company.orders
snapshot.include.collection.list=company.orders
debezium.source.topic.prefix=dbz_
debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_test
debezium.source.include.schema.changes=true

# saving debezium state data to destination, iceberg tables
# see https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming
debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table
debezium.source.database.history=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.database.history.iceberg.table-name=debezium_database_history_storage_test

# enable event schemas - mandatory
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
#debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc
#debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.max.batch.size=10000
debezium.source.max.queue.size=100000
debezium.sink.batch.batch-size-wait.max-wait-ms=120000
debezium.sink.batch.batch-size-wait.wait-interval-ms=10000

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
quarkus.log.console.json=false
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

logs:

MacBook-Pro:debezium-server-iceberg milan$ ./run.sh

       __       __                 _                 
  ____/ /___   / /_   ___  ____   (_)__  __ ____ ___ 
 / __  // _ \ / __ \ / _ \/_  /  / // / / // __ `__ \
/ /_/ //  __// /_/ //  __/ / /_ / // /_/ // / / / / /
\__,_/ \___//_.___/ \___/ /___//_/ \__,_//_/ /_/ /_/ 

                            Powered by Quarkus 3.10.0
2024-07-09 13:17:55,255 INFO  [io.deb.ser.BaseChangeConsumer] (main) Using 'io.debezium.server.BaseChangeConsumer$$Lambda/0x000000012c370000@777d191f' stream name mapper
2024-07-09 13:17:55,368 INFO  [org.apa.ice.CatalogUtil] (main) Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO
2024-07-09T07:47:55.485675Z main ERROR Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console...
2024-07-09 13:17:59,842 WARN  [org.apa.ice.jdb.JdbcCatalog] (main) JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
2024-07-09 13:17:59,848 INFO  [io.deb.ser.ice.IcebergUtil] (main) Using io.quarkus.arc.impl.InstanceImpl
2024-07-09 13:17:59,872 INFO  [io.deb.ser.jso.JsonSerdeConfig] (main) JsonSerdeConfig values: 
    from.field = null
    unknown.properties.ignored = false

2024-07-09 13:17:59,873 INFO  [io.deb.ser.jso.JsonSerdeConfig] (main) JsonSerdeConfig values: 
    from.field = null
    unknown.properties.ignored = false

2024-07-09 13:17:59,873 INFO  [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.iceberg.IcebergChangeConsumer' instantiated
2024-07-09 13:17:59,928 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = header
    decimal.format = BASE64
    replace.null.with.default = true
    schemas.cache.size = 1000
    schemas.enable = true

2024-07-09 13:17:59,930 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = key
    decimal.format = BASE64
    replace.null.with.default = true
    schemas.cache.size = 1000
    schemas.enable = true

2024-07-09 13:17:59,931 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    replace.null.with.default = true
    schemas.cache.size = 1000
    schemas.enable = true

2024-07-09 13:17:59,960 INFO  [io.deb.emb.EmbeddedWorkerConfig] (main) EmbeddedWorkerConfig values: 
    access.control.allow.methods = 
    access.control.allow.origin = 
    admin.listeners = null
    auto.include.jmx.reporter = true
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    config.providers = []
    connector.client.config.override.policy = All
    header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    listeners = [http://:8083]
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    offset.flush.interval.ms = 0
    offset.flush.timeout.ms = 5000
    offset.storage.file.filename = 
    offset.storage.partitions = null
    offset.storage.replication.factor = null
    offset.storage.topic = 
    plugin.discovery = hybrid_warn
    plugin.path = null
    response.http.headers.config = 
    rest.advertised.host.name = null
    rest.advertised.listener = null
    rest.advertised.port = null
    rest.extension.classes = []
    ssl.cipher.suites = null
    ssl.client.auth = none
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    task.shutdown.graceful.timeout.ms = 5000
    topic.creation.enable = true
    topic.tracking.allow.reset = true
    topic.tracking.enable = true
    value.converter = class org.apache.kafka.connect.json.JsonConverter

2024-07-09 13:17:59,981 WARN  [io.deb.tra.AbstractExtractNewRecordState] (main) The deleted record handling configs "drop.tombstones" and "delete.handling.mode" have been deprecated, please use "delete.tombstone.handling.mode" instead.
2024-07-09 13:17:59,999 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = key
    decimal.format = BASE64
    replace.null.with.default = true
    schemas.cache.size = 1000
    schemas.enable = false

2024-07-09 13:17:59,999 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    replace.null.with.default = true
    schemas.cache.size = 1000
    schemas.enable = false

2024-07-09 13:17:59,999 INFO  [io.deb.ser.DebeziumServer] (main) Engine executor started
2024-07-09 13:18:00,000 INFO  [io.deb.emb.asy.AsyncEmbeddedEngine] (pool-7-thread-1) Engine state has changed from 'CREATING' to 'INITIALIZING'
2024-07-09 13:18:00,051 INFO  [io.deb.con.CommonConnectorConfig] (pool-7-thread-1) Loading the custom source info struct maker plugin: io.debezium.connector.mysql.MySqlSourceInfoStructMaker
2024-07-09 13:18:00,125 INFO  [io.quarkus] (main) debezium-server-iceberg-dist 0.4.0-SNAPSHOT on JVM (powered by Quarkus 3.10.0) started in 12.208s. Listening on: http://0.0.0.0:8080
2024-07-09 13:18:00,127 INFO  [io.quarkus] (main) Profile prod activated. 
2024-07-09 13:18:00,127 INFO  [io.quarkus] (main) Installed features: [cdi, kubernetes-client, micrometer, resteasy-jackson, smallrye-context-propagation, smallrye-health, vertx]
2024-07-09 13:18:02,342 INFO  [io.deb.con.bin.BinlogConnector] (pool-7-thread-1) Successfully tested connection for jdbc:mysql://company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'root'
2024-07-09 13:18:02,348 INFO  [io.deb.jdb.JdbcConnection] (pool-10-thread-1) Connection gracefully closed
2024-07-09 13:18:02,397 INFO  [io.deb.ser.ice.off.IcebergOffsetBackingStore$IcebergOffsetBackingStoreConfig] (pool-7-thread-1) IcebergOffsetBackingStoreConfig values: 

2024-07-09 13:18:02,418 INFO  [org.apa.ice.CatalogUtil] (pool-7-thread-1) Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO
2024-07-09 13:18:05,968 WARN  [org.apa.ice.jdb.JdbcCatalog] (pool-7-thread-1) JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
2024-07-09 13:18:05,972 INFO  [io.deb.ser.ice.off.IcebergOffsetBackingStore] (pool-7-thread-1) Starting IcebergOffsetBackingStore table:company.debezium_offset_storage_custom_table
2024-07-09 13:18:06,240 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-7-thread-1) Refreshing table metadata from new version: s3://iceberg-cdc-table/warehouse/company/debezium_offset_storage_custom_table/metadata/00008-0c922030-6ee2-47aa-b161-7e1af2770342.metadata.json
2024-07-09 13:18:07,903 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-7-thread-1) Table loaded by catalog: iceberg.company.debezium_offset_storage_custom_table
2024-07-09 13:18:08,147 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-7-thread-1) Refreshing table metadata from new version: s3://iceberg-cdc-table/warehouse/company/debezium_offset_storage_custom_table/metadata/00008-0c922030-6ee2-47aa-b161-7e1af2770342.metadata.json
2024-07-09 13:18:08,421 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-7-thread-1) Table loaded by catalog: iceberg.company.debezium_offset_storage_custom_table
2024-07-09 13:18:08,460 INFO  [org.apa.ice.SnapshotScan] (pool-7-thread-1) Scanning table iceberg.company.debezium_offset_storage_custom_table snapshot 3722366104591936950 created at 2024-07-09T05:25:00.137+00:00 with filter true
2024-07-09 13:18:12,523 WARN  [org.apa.had.uti.NativeCodeLoader] (pool-7-thread-1) Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-07-09 13:18:13,556 INFO  [org.apa.ice.met.LoggingMetricsReporter] (pool-7-thread-1) Received metrics report: ScanReport{tableName=iceberg.company.debezium_offset_storage_custom_table, snapshotId=3722366104591936950, filter=true, schemaId=0, projectedFieldIds=[1, 2, 3], projectedFieldNames=[id, offset_data, record_insert_ts], scanMetrics=ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT5.065747303S, count=1}, resultDataFiles=CounterResult{unit=COUNT, value=1}, resultDeleteFiles=CounterResult{unit=COUNT, value=0}, totalDataManifests=CounterResult{unit=COUNT, value=1}, totalDeleteManifests=CounterResult{unit=COUNT, value=0}, scannedDataManifests=CounterResult{unit=COUNT, value=1}, skippedDataManifests=CounterResult{unit=COUNT, value=0}, totalFileSizeInBytes=CounterResult{unit=BYTES, value=1910}, totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0}, skippedDataFiles=CounterResult{unit=COUNT, value=0}, skippedDeleteFiles=CounterResult{unit=COUNT, value=0}, scannedDeleteManifests=CounterResult{unit=COUNT, value=0}, skippedDeleteManifests=CounterResult{unit=COUNT, value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=0}, equalityDeleteFiles=CounterResult{unit=COUNT, value=0}, positionalDeleteFiles=CounterResult{unit=COUNT, value=0}}, metadata={iceberg-version=Apache Iceberg 1.5.2 (commit cbb853073e681b4075d7c8707610dceecbee3a82)}}
2024-07-09 13:18:13,583 INFO  [io.deb.emb.asy.AsyncEmbeddedEngine] (pool-7-thread-1) Engine state has changed from 'INITIALIZING' to 'CREATING_TASKS'
2024-07-09 13:18:13,614 INFO  [io.deb.emb.asy.AsyncEmbeddedEngine] (pool-7-thread-1) Engine state has changed from 'CREATING_TASKS' to 'STARTING_TASKS'
2024-07-09 13:18:13,617 INFO  [io.deb.emb.asy.AsyncEmbeddedEngine] (pool-7-thread-1) Waiting max. for 180000 ms for individual source tasks to start.
2024-07-09 13:18:13,619 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) Starting MySqlConnectorTask with configuration:
2024-07-09 13:18:13,624 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    connector.class = io.debezium.connector.mysql.MySqlConnector
2024-07-09 13:18:13,624 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.table-prefix = debeziumcdc_
2024-07-09 13:18:13,625 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.type = jdbc
2024-07-09 13:18:13,625 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.type = jdbc
2024-07-09 13:18:13,625 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    transforms = unwrap
2024-07-09 13:18:13,625 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.include.list = company.orders
2024-07-09 13:18:13,625 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.table-namespace = company
2024-07-09 13:18:13,625 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.write.format.default = parquet
2024-07-09 13:18:13,626 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.upsert = true
2024-07-09 13:18:13,626 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.catalog-name = iceberg
2024-07-09 13:18:13,626 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.s3.secret-access-key = xxxxxx
2024-07-09 13:18:13,626 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    include.schema.changes = true
2024-07-09 13:18:13,626 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.table-namespace = company
2024-07-09 13:18:13,627 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.s3.secret-access-key = xxxxxx
2024-07-09 13:18:13,627 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    transforms.unwrap.drop.tombstones = true
2024-07-09 13:18:13,627 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.warehouse = s3://iceberg-cdc-table/warehouse
2024-07-09 13:18:13,627 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.write.format.default = parquet
2024-07-09 13:18:13,627 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.s3.access-key-id = xxxx
2024-07-09 13:18:13,627 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.io-impl = org.apache.iceberg.aws.s3.S3FileIO
2024-07-09 13:18:13,628 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    record.processing.threads = 
2024-07-09 13:18:13,628 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState
2024-07-09 13:18:13,628 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    errors.retry.delay.initial.ms = 300
2024-07-09 13:18:13,628 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    key.converter = org.apache.kafka.connect.json.JsonConverter
2024-07-09 13:18:13,628 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.io-impl = org.apache.iceberg.aws.s3.S3FileIO
2024-07-09 13:18:13,628 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.history.iceberg.table-name = debezium_database_history_storage_test
2024-07-09 13:18:13,629 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.table-name = debezium_offset_storage_custom_table
2024-07-09 13:18:13,629 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.dbname = mysql
2024-07-09 13:18:13,629 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.user = root
2024-07-09 13:18:13,629 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    header.converter.value.schemas.enable = true
2024-07-09 13:18:13,629 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage = io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
2024-07-09 13:18:13,629 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    header.converter.key = json
2024-07-09 13:18:13,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    header.converter.key.schemas.enable = true
2024-07-09 13:18:13,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.jdbc.password = ********
2024-07-09 13:18:13,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.s3.path-style-access = true
2024-07-09 13:18:13,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.s3.region = us-east-1
2024-07-09 13:18:13,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.s3.endpoint = https://iceberg-cdc-table.s3.amazonaws.com/warehouse
2024-07-09 13:18:13,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    key.converter.schemas.enable = true
2024-07-09 13:18:13,630 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    internal.task.management.timeout.ms = 180000
2024-07-09 13:18:13,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    errors.max.retries = -1
2024-07-09 13:18:13,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    value.converter.key.schemas.enable = true
2024-07-09 13:18:13,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.password = ********
2024-07-09 13:18:13,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    value.converter.schemas.enable = true
2024-07-09 13:18:13,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    name = iceberg
2024-07-09 13:18:13,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.uri = jdbc:mysql://company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306/company
2024-07-09 13:18:13,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    max.batch.size = 10000
2024-07-09 13:18:13,631 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.upsert-keep-deletes = true
2024-07-09 13:18:13,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.history = io.debezium.server.iceberg.history.IcebergSchemaHistory
2024-07-09 13:18:13,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    max.queue.size = 100000
2024-07-09 13:18:13,632 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    transforms.unwrap.delete.handling.mode = rewrite
2024-07-09 13:18:13,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.uri = jdbc:mysql://company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306/company
2024-07-09 13:18:13,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    record.processing.shutdown.timeout.ms = 1000
2024-07-09 13:18:13,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.jdbc.user = root
2024-07-09 13:18:13,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.s3.region = us-east-1
2024-07-09 13:18:13,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    record.processing.order = ORDERED
2024-07-09 13:18:13,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    topic.prefix = dbz_
2024-07-09 13:18:13,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.file.filename = 
2024-07-09 13:18:13,633 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.s3.access-key-id = AKIAW3CYNTFD6VMPDUPT
2024-07-09 13:18:13,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.upsert = true
2024-07-09 13:18:13,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    value.converter = org.apache.kafka.connect.json.JsonConverter
2024-07-09 13:18:13,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    value.converter.value.schemas.enable = true
2024-07-09 13:18:13,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    header.converter.value = json
2024-07-09 13:18:13,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.server.id = 1234
2024-07-09 13:18:13,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.storage.iceberg.jdbc.user = root
2024-07-09 13:18:13,634 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    key.converter.value.schemas.enable = true
2024-07-09 13:18:13,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    key.converter.value = json
2024-07-09 13:18:13,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    key.converter.key.schemas.enable = true
2024-07-09 13:18:13,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.catalog-name = iceberg
2024-07-09 13:18:13,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.server.name = mysql80
2024-07-09 13:18:13,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.flush.timeout.ms = 5000
2024-07-09 13:18:13,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    errors.retry.delay.max.ms = 10000
2024-07-09 13:18:13,635 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.s3.endpoint = https://iceberg-cdc-table.s3.amazonaws.com/warehouse
2024-07-09 13:18:13,636 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.jdbc.password = ********
2024-07-09 13:18:13,636 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    value.converter.value = json
2024-07-09 13:18:13,636 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.port = 3306
2024-07-09 13:18:13,636 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    offset.flush.interval.ms = 0
2024-07-09 13:18:13,636 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal = io.debezium.server.iceberg.history.IcebergSchemaHistory
2024-07-09 13:18:13,636 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    record.processing.with.serial.consumer = false
2024-07-09 13:18:13,637 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.hostname = company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com
2024-07-09 13:18:13,637 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.table-name = debezium_database_history_storage_test
2024-07-09 13:18:13,638 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.table-prefix = debeziumcdc_
2024-07-09 13:18:13,638 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.upsert-keep-deletes = true
2024-07-09 13:18:13,638 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    transforms.unwrap.add.fields = op,table,source.ts_ms,db
2024-07-09 13:18:13,638 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    table.include.list = company.orders
2024-07-09 13:18:13,638 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    value.converter.key = json
2024-07-09 13:18:13,638 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.warehouse = s3://iceberg-cdc-table/warehouse
2024-07-09 13:18:13,639 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    schema.history.internal.iceberg.s3.path-style-access = true
2024-07-09 13:18:13,639 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    database.include.list = company
2024-07-09 13:18:13,639 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1)    key.converter.key = json
2024-07-09 13:18:13,639 INFO  [io.deb.con.CommonConnectorConfig] (pool-8-thread-1) Loading the custom source info struct maker plugin: io.debezium.connector.mysql.MySqlSourceInfoStructMaker
2024-07-09 13:18:13,642 INFO  [io.deb.con.CommonConnectorConfig] (pool-8-thread-1) Loading the custom topic naming strategy plugin: io.debezium.schema.DefaultTopicNamingStrategy
2024-07-09 13:18:13,692 INFO  [io.deb.con.com.BaseSourceTask] (pool-8-thread-1) Found previous partition offset BinlogPartition{serverName='dbz_'} io.debezium.connector.mysql.MySqlPartition@2ef442: {file=mysql-bin-changelog.000002, pos=2870}
2024-07-09 13:18:16,590 INFO  [org.apa.ice.CatalogUtil] (pool-8-thread-1) Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO
2024-07-09 13:18:19,735 WARN  [org.apa.ice.jdb.JdbcCatalog] (pool-8-thread-1) JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
2024-07-09 13:18:19,741 INFO  [io.deb.ser.ice.his.IcebergSchemaHistory] (pool-8-thread-1) Starting IcebergSchemaHistory storage table:company.debezium_database_history_storage_test
2024-07-09 13:18:19,971 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-8-thread-1) Refreshing table metadata from new version: s3://iceberg-cdc-table/warehouse/company/debezium_database_history_storage_test/metadata/00013-cd819b27-b73d-4ad6-8151-486b9a2b4c9d.metadata.json
2024-07-09 13:18:21,089 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-8-thread-1) Table loaded by catalog: iceberg.company.debezium_database_history_storage_test
2024-07-09 13:18:21,320 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-8-thread-1) Refreshing table metadata from new version: s3://iceberg-cdc-table/warehouse/company/debezium_database_history_storage_test/metadata/00013-cd819b27-b73d-4ad6-8151-486b9a2b4c9d.metadata.json
2024-07-09 13:18:21,592 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-8-thread-1) Table loaded by catalog: iceberg.company.debezium_database_history_storage_test
2024-07-09 13:18:22,311 INFO  [io.deb.con.mys.MySqlConnectorTask] (pool-8-thread-1) Closing connection before starting schema recovery
2024-07-09 13:18:22,313 INFO  [io.deb.jdb.JdbcConnection] (pool-17-thread-1) Connection gracefully closed
2024-07-09 13:18:22,549 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-8-thread-1) Refreshing table metadata from new version: s3://iceberg-cdc-table/warehouse/company/debezium_database_history_storage_test/metadata/00013-cd819b27-b73d-4ad6-8151-486b9a2b4c9d.metadata.json
2024-07-09 13:18:22,829 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-8-thread-1) Table loaded by catalog: iceberg.company.debezium_database_history_storage_test
2024-07-09 13:18:22,830 INFO  [org.apa.ice.SnapshotScan] (pool-8-thread-1) Scanning table iceberg.company.debezium_database_history_storage_test snapshot 7228691340918850567 created at 2024-07-09T05:24:33.950+00:00 with filter true
2024-07-09 13:18:27,093 INFO  [org.apa.ice.met.LoggingMetricsReporter] (pool-8-thread-1) Received metrics report: ScanReport{tableName=iceberg.company.debezium_database_history_storage_test, snapshotId=7228691340918850567, filter=true, schemaId=0, projectedFieldIds=[1, 2, 3], projectedFieldNames=[id, history_data, record_insert_ts], scanMetrics=ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT4.262041345S, count=1}, resultDataFiles=CounterResult{unit=COUNT, value=1}, resultDeleteFiles=CounterResult{unit=COUNT, value=0}, totalDataManifests=CounterResult{unit=COUNT, value=1}, totalDeleteManifests=CounterResult{unit=COUNT, value=0}, scannedDataManifests=CounterResult{unit=COUNT, value=1}, skippedDataManifests=CounterResult{unit=COUNT, value=0}, totalFileSizeInBytes=CounterResult{unit=BYTES, value=8945}, totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0}, skippedDataFiles=CounterResult{unit=COUNT, value=0}, skippedDeleteFiles=CounterResult{unit=COUNT, value=0}, scannedDeleteManifests=CounterResult{unit=COUNT, value=0}, skippedDeleteManifests=CounterResult{unit=COUNT, value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=0}, equalityDeleteFiles=CounterResult{unit=COUNT, value=0}, positionalDeleteFiles=CounterResult{unit=COUNT, value=0}}, metadata={iceberg-version=Apache Iceberg 1.5.2 (commit cbb853073e681b4075d7c8707610dceecbee3a82)}}
2024-07-09 13:18:27,347 INFO  [io.deb.con.bin.jdb.BinlogConnectorConnection] (pool-8-thread-1) Get all known binlogs
2024-07-09 13:18:29,935 INFO  [io.deb.con.bin.jdb.BinlogConnectorConnection] (pool-8-thread-1) Server has the binlog file 'mysql-bin-changelog.000002' required by the connector
2024-07-09 13:18:30,178 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-8-thread-1) Refreshing table metadata from new version: s3://iceberg-cdc-table/warehouse/company/debezium_database_history_storage_test/metadata/00013-cd819b27-b73d-4ad6-8151-486b9a2b4c9d.metadata.json
2024-07-09 13:18:31,266 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-8-thread-1) Table loaded by catalog: iceberg.company.debezium_database_history_storage_test
2024-07-09 13:18:31,267 INFO  [org.apa.ice.SnapshotScan] (pool-8-thread-1) Scanning table iceberg.company.debezium_database_history_storage_test snapshot 7228691340918850567 created at 2024-07-09T05:24:33.950+00:00 with filter true
2024-07-09 13:18:33,595 INFO  [org.apa.ice.met.LoggingMetricsReporter] (pool-8-thread-1) Received metrics report: ScanReport{tableName=iceberg.company.debezium_database_history_storage_test, snapshotId=7228691340918850567, filter=true, schemaId=0, projectedFieldIds=[1, 2, 3], projectedFieldNames=[id, history_data, record_insert_ts], scanMetrics=ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT2.326973929S, count=1}, resultDataFiles=CounterResult{unit=COUNT, value=1}, resultDeleteFiles=CounterResult{unit=COUNT, value=0}, totalDataManifests=CounterResult{unit=COUNT, value=1}, totalDeleteManifests=CounterResult{unit=COUNT, value=0}, scannedDataManifests=CounterResult{unit=COUNT, value=1}, skippedDataManifests=CounterResult{unit=COUNT, value=0}, totalFileSizeInBytes=CounterResult{unit=BYTES, value=8945}, totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0}, skippedDataFiles=CounterResult{unit=COUNT, value=0}, skippedDeleteFiles=CounterResult{unit=COUNT, value=0}, scannedDeleteManifests=CounterResult{unit=COUNT, value=0}, skippedDeleteManifests=CounterResult{unit=COUNT, value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=0}, equalityDeleteFiles=CounterResult{unit=COUNT, value=0}, positionalDeleteFiles=CounterResult{unit=COUNT, value=0}}, metadata={iceberg-version=Apache Iceberg 1.5.2 (commit cbb853073e681b4075d7c8707610dceecbee3a82)}}
2024-07-09 13:18:33,812 INFO  [io.deb.rel.his.SchemaHistoryMetrics] (pool-8-thread-1) Started database schema history recovery
2024-07-09 13:18:34,049 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-8-thread-1) Refreshing table metadata from new version: s3://iceberg-cdc-table/warehouse/company/debezium_database_history_storage_test/metadata/00013-cd819b27-b73d-4ad6-8151-486b9a2b4c9d.metadata.json
2024-07-09 13:18:35,217 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-8-thread-1) Table loaded by catalog: iceberg.company.debezium_database_history_storage_test
2024-07-09 13:18:35,217 INFO  [org.apa.ice.SnapshotScan] (pool-8-thread-1) Scanning table iceberg.company.debezium_database_history_storage_test snapshot 7228691340918850567 created at 2024-07-09T05:24:33.950+00:00 with filter true
2024-07-09 13:18:37,470 INFO  [org.apa.ice.met.LoggingMetricsReporter] (pool-8-thread-1) Received metrics report: ScanReport{tableName=iceberg.company.debezium_database_history_storage_test, snapshotId=7228691340918850567, filter=true, schemaId=0, projectedFieldIds=[1, 2, 3], projectedFieldNames=[id, history_data, record_insert_ts], scanMetrics=ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT2.252375431S, count=1}, resultDataFiles=CounterResult{unit=COUNT, value=1}, resultDeleteFiles=CounterResult{unit=COUNT, value=0}, totalDataManifests=CounterResult{unit=COUNT, value=1}, totalDeleteManifests=CounterResult{unit=COUNT, value=0}, scannedDataManifests=CounterResult{unit=COUNT, value=1}, skippedDataManifests=CounterResult{unit=COUNT, value=0}, totalFileSizeInBytes=CounterResult{unit=BYTES, value=8945}, totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0}, skippedDataFiles=CounterResult{unit=COUNT, value=0}, skippedDeleteFiles=CounterResult{unit=COUNT, value=0}, scannedDeleteManifests=CounterResult{unit=COUNT, value=0}, skippedDeleteManifests=CounterResult{unit=COUNT, value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=0}, equalityDeleteFiles=CounterResult{unit=COUNT, value=0}, positionalDeleteFiles=CounterResult{unit=COUNT, value=0}}, metadata={iceberg-version=Apache Iceberg 1.5.2 (commit cbb853073e681b4075d7c8707610dceecbee3a82)}}
2024-07-09 13:18:37,775 INFO  [org.apa.ice.SnapshotScan] (pool-8-thread-1) Scanning table iceberg.company.debezium_database_history_storage_test snapshot 7228691340918850567 created at 2024-07-09T05:24:33.950+00:00 with filter true
2024-07-09 13:18:40,823 INFO  [io.deb.rel.his.SchemaHistoryMetrics] (pool-8-thread-1) Database schema history recovery in progress, recovered 1 records
2024-07-09 13:18:40,838 INFO  [io.deb.rel.his.SchemaHistoryMetrics] (pool-8-thread-1) Already applied 1 database changes
2024-07-09 13:18:40,839 INFO  [org.apa.ice.met.LoggingMetricsReporter] (pool-8-thread-1) Received metrics report: ScanReport{tableName=iceberg.company.debezium_database_history_storage_test, snapshotId=7228691340918850567, filter=true, schemaId=0, projectedFieldIds=[1, 2, 3], projectedFieldNames=[id, history_data, record_insert_ts], scanMetrics=ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT3.063030488S, count=1}, resultDataFiles=CounterResult{unit=COUNT, value=1}, resultDeleteFiles=CounterResult{unit=COUNT, value=0}, totalDataManifests=CounterResult{unit=COUNT, value=1}, totalDeleteManifests=CounterResult{unit=COUNT, value=0}, scannedDataManifests=CounterResult{unit=COUNT, value=1}, skippedDataManifests=CounterResult{unit=COUNT, value=0}, totalFileSizeInBytes=CounterResult{unit=BYTES, value=8945}, totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0}, skippedDataFiles=CounterResult{unit=COUNT, value=0}, skippedDeleteFiles=CounterResult{unit=COUNT, value=0}, scannedDeleteManifests=CounterResult{unit=COUNT, value=0}, skippedDeleteManifests=CounterResult{unit=COUNT, value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=0}, equalityDeleteFiles=CounterResult{unit=COUNT, value=0}, positionalDeleteFiles=CounterResult{unit=COUNT, value=0}}, metadata={iceberg-version=Apache Iceberg 1.5.2 (commit cbb853073e681b4075d7c8707610dceecbee3a82)}}
2024-07-09 13:18:40,839 INFO  [io.deb.rel.his.SchemaHistoryMetrics] (pool-8-thread-1) Finished database schema history recovery of 1 change(s) in 7027 ms
2024-07-09 13:18:40,840 INFO  [io.deb.con.mys.MySqlConnectorTask] (pool-8-thread-1) Reconnecting after finishing schema recovery
2024-07-09 13:18:41,129 INFO  [io.deb.con.mys.MySqlConnectorTask] (pool-8-thread-1) Found previous offset BinlogOffsetContext{sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=BinlogSourceInfo{currentGtid='null', currentBinlogFilename='mysql-bin-changelog.000002', currentBinlogPosition=2870, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery='null', tableIds=[], databaseName='null'}, snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet='null', currentGtidSet='null', restartBinlogFilename='mysql-bin-changelog.000002', restartBinlogPosition=2870, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId='null', incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]}
2024-07-09 13:18:41,160 INFO  [io.deb.uti.Threads] (pool-8-thread-1) Requested thread factory for component MySqlConnector, id = dbz_ named = SignalProcessor
2024-07-09 13:18:41,183 INFO  [io.deb.uti.Threads] (pool-8-thread-1) Requested thread factory for component MySqlConnector, id = dbz_ named = change-event-source-coordinator
2024-07-09 13:18:41,184 INFO  [io.deb.uti.Threads] (pool-8-thread-1) Requested thread factory for component MySqlConnector, id = dbz_ named = blocking-snapshot
2024-07-09 13:18:41,186 INFO  [io.deb.uti.Threads] (pool-8-thread-1) Creating thread debezium-mysqlconnector-dbz_-change-event-source-coordinator
2024-07-09 13:18:41,187 INFO  [io.deb.emb.asy.AsyncEmbeddedEngine] (pool-7-thread-1) All tasks have stated successfully.
2024-07-09 13:18:41,187 INFO  [io.deb.emb.asy.AsyncEmbeddedEngine] (pool-7-thread-1) Engine state has changed from 'STARTING_TASKS' to 'POLLING_TASKS'
2024-07-09 13:18:41,188 INFO  [io.deb.emb.asy.AsyncEmbeddedEngine] (pool-7-thread-1) Using io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor processor
2024-07-09 13:18:41,191 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Metrics registered
2024-07-09 13:18:41,192 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Context created
2024-07-09 13:18:41,198 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) A previous offset indicating a completed snapshot has been found.
2024-07-09 13:18:41,201 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Snapshot ended with SnapshotResult [status=SKIPPED, offset=BinlogOffsetContext{sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=BinlogSourceInfo{currentGtid='null', currentBinlogFilename='mysql-bin-changelog.000002', currentBinlogPosition=2870, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery='null', tableIds=[], databaseName='null'}, snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet='null', currentGtidSet='null', restartBinlogFilename='mysql-bin-changelog.000002', restartBinlogPosition=2870, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId='null', incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]}]
2024-07-09 13:18:41,205 INFO  [io.deb.uti.Threads] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Requested thread factory for component MySqlConnector, id = dbz_ named = binlog-client
2024-07-09 13:18:41,479 INFO  [io.deb.con.bin.BinlogStreamingChangeEventSource] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Enable ssl PREFERRED mode for connector dbz_
2024-07-09 13:18:41,755 INFO  [io.deb.pip.sig.SignalProcessor] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) SignalProcessor started. Scheduling it every 5000ms
2024-07-09 13:18:41,755 INFO  [io.deb.uti.Threads] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Creating thread debezium-mysqlconnector-dbz_-SignalProcessor
2024-07-09 13:18:41,756 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Starting streaming
2024-07-09 13:18:41,756 WARN  [io.deb.rel.RelationalDatabaseSchema] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) After applying the include/exclude list filters, no changes will be captured. Please check your configuration!
2024-07-09 13:18:41,760 INFO  [io.deb.con.bin.BinlogStreamingChangeEventSource] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Skip 0 events on streaming start
2024-07-09 13:18:41,760 INFO  [io.deb.con.bin.BinlogStreamingChangeEventSource] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Skip 0 rows on streaming start
2024-07-09 13:18:41,760 INFO  [io.deb.uti.Threads] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Creating thread debezium-mysqlconnector-dbz_-binlog-client
2024-07-09 13:18:41,986 INFO  [io.deb.uti.Threads] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) Creating thread debezium-mysqlconnector-dbz_-binlog-client
2024-07-09 13:18:42,918 INFO  [com.git.shy.mys.bin.BinaryLogClient] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) SSL enabled
2024-07-09 13:18:44,650 INFO  [com.git.shy.mys.bin.BinaryLogClient] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) Connected to company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306 at mysql-bin-changelog.000002/2870 (sid:1234, cid:1060)
2024-07-09 13:18:44,651 INFO  [io.deb.con.bin.BinlogStreamingChangeEventSource] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) Connected to binlog at company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306, starting at BinlogOffsetContext{sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=BinlogSourceInfo{currentGtid='null', currentBinlogFilename='mysql-bin-changelog.000002', currentBinlogPosition=2870, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery='null', tableIds=[], databaseName='null'}, snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet='null', currentGtidSet='null', restartBinlogFilename='mysql-bin-changelog.000002', restartBinlogPosition=2870, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId='null', incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]}
2024-07-09 13:18:44,651 INFO  [io.deb.con.bin.BinlogStreamingChangeEventSource] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Waiting for keepalive thread to start
2024-07-09 13:18:44,652 INFO  [io.deb.uti.Threads] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) Creating thread debezium-mysqlconnector-dbz_-binlog-client
2024-07-09 13:18:44,753 INFO  [io.deb.con.bin.BinlogStreamingChangeEventSource] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Keepalive thread is running

CREATE DATABASE company;CREATE TABLE orders (    order_id INT AUTO_INCREMENT PRIMARY KEY,    customer_id INT NOT NULL,    order_date DATE NOT NULL,    total_amount DECIMAL(10, 2) NOT NULL); INSERT INTO orders (customer_id, order_date, total_amount) VALUES (1, '2023-01-15', 150.50);INSERT INTO orders (customer_id, order_date, total_amount) VALUES (2, '2023-02-20', 200.00); INSERT INTO orders (customer_id, order_date, total_amount) VALUES (3, '2023-03-25', 320.75); INSERT INTO orders (customer_id, order_date, total_amount) VALUES (4, '2023-04-10', 450.00); INSERT INTO orders (customer_id, order_date, total_amount) VALUES (5, '2023-05-05', 500.25);

I don't see these events streaming to my s3. Can anyone help identify the error in config ?

Screen Shot 2024-07-09 at 11 05 28 AM Screen Shot 2024-07-09 at 12 29 57 PM
DipalPrajapati commented 3 weeks ago
2024-07-09 13:46:26,318 INFO  [org.apa.ice.BaseMetastoreTableOperations] (IcebergOffsetBackingStore-1) Successfully committed to table company.debezium_offset_storage_custom_table in 926 ms
2024-07-09 13:46:26,530 INFO  [org.apa.ice.BaseMetastoreTableOperations] (IcebergOffsetBackingStore-1) Refreshing table metadata from new version: s3://iceberg-cdc-table/warehouse/company/debezium_offset_storage_custom_table/metadata/00007-67efc028-b29a-435d-8bf1-6e7507d63491.metadata.json
2024-07-09 13:46:27,044 INFO  [com.git.shy.mys.bin.BinaryLogClient] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) SSL enabled
2024-07-09 13:46:29,157 INFO  [com.git.shy.mys.bin.BinaryLogClient] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) Connected to company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306 at mysql-bin-changelog.000002/39428 (sid:1234, cid:1221)
2024-07-09 13:46:29,158 INFO  [io.deb.con.bin.BinlogStreamingChangeEventSource] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) Connected to binlog at company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306, starting at BinlogOffsetContext{sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=BinlogSourceInfo{currentGtid='null', currentBinlogFilename='mysql-bin-changelog.000002', currentBinlogPosition=39428, currentRowNumber=0, serverId=0, sourceTime=2024-07-09T08:16:15.501Z, threadId=-1, currentQuery='null', tableIds=[company.iceberg_namespace_properties], databaseName='company'}, snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet='null', currentGtidSet='null', restartBinlogFilename='mysql-bin-changelog.000002', restartBinlogPosition=39428, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId='null', incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]}
2024-07-09 13:46:29,158 INFO  [io.deb.con.bin.BinlogStreamingChangeEventSource] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Waiting for keepalive thread to start
2024-07-09 13:46:29,159 INFO  [io.deb.uti.Threads] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) Creating thread debezium-mysqlconnector-dbz_-binlog-client
2024-07-09 13:46:29,260 INFO  [io.deb.con.bin.BinlogStreamingChangeEventSource] (debezium-mysqlconnector-dbz_-change-event-source-coordinator) Keepalive thread is running
] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) Received query command: Event{header=EventHeaderV4{timestamp=1720517320000, eventType=QUERY, serverId=28029116, headerLength=19, dataLength=59, nextPosition=123565, flags=8}, data=QueryEventData{threadId=1184, executionTime=0, errorCode=0, database='company', sql='BEGIN'}}

2024-07-09 15:04:53,242 DEBUG [io.deb.con.bas.ChangeEventQueue] (pool-8-thread-1) no records available or batch size not reached yet, sleeping a bit...
2024-07-09 15:04:53,746 DEBUG [io.deb.con.bas.ChangeEventQueue] (pool-8-thread-1) checking for more records...
2024-07-09 15:04:53,746 DEBUG [io.deb.con.bas.ChangeEventQueue] (pool-8-thread-1) polling records...
2024-07-09 15:04:53,746 DEBUG [io.deb.con.bas.ChangeEventQueue] (pool-8-thread-1) no records available or batch size not reached yet, sleeping a bit...
2024-07-09 15:04:54,085 DEBUG [io.deb.con.bin.BinlogStreamingChangeEventSource] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) Received query command: Event{header=EventHeaderV4{timestamp=1720517694000, eventType=QUERY, serverId=28029116, headerLength=19, dataLength=89, nextPosition=126073, flags=8}, data=QueryEventData{threadId=1184, executionTime=0, errorCode=0, database='test', sql='create database test'}}
2024-07-09 15:04:54,086 DEBUG [io.deb.con.bin.BinlogDatabaseSchema] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) Processing streaming DDL 'create database test' for database 'test'

for DML 
] (blc-company-db.cluster-cpw2c08moqyr.us-east-1.rds.amazonaws.com:3306) Received query command: Event{header=EventHeaderV4{timestamp=1720517320000, eventType=QUERY, serverId=28029116, headerLength=19, dataLength=59, nextPosition=123565, flags=8}, data=QueryEventData{threadId=1184, executionTime=0, errorCode=0, database='company', sql='BEGIN'}}

Last I see this. It seems only ddl level are captured.

ismailsimsek commented 3 weeks ago

@DipalPrajapati please review this section. Mysql don't have schema.include.list option i recommend testing with * regexp first to find the working configuration.

debezium.source.database.include.list=company
debezium.source.schema.include.list=company.orders
debezium.source.table.include.list=company.orders
snapshot.include.collection.list=company.orders

https://debezium.io/documentation/reference/nightly/connectors/mysql.html#mysql-property-table-include-list

DipalPrajapati commented 3 weeks ago

As you can see in log it is already capturing DDL change event but not DML one.

Screen Shot 2024-07-09 at 4 50 36 PM

But for any other insert of update query it gives query=BEGIN

Screen Shot 2024-07-09 at 4 44 08 PM