memiiso / debezium-server-iceberg

Replicates any database (CDC events) to Apache Iceberg (To Cloud Storage)
Apache License 2.0
171 stars 35 forks source link

Problem with small datafiles #229

Open night2201 opened 10 months ago

night2201 commented 10 months ago
ismailsimsek commented 10 months ago

@night2201 could you enable debug log for this class. then you can see more logs

quarkus.log.category."io.debezium.server.iceberg.batchsizewait".level"=DEBUG

also it is not waiting if snapshot process is running (to increase consuming speed during snapshot) https://github.com/memiiso/debezium-server-iceberg/blob/5819c1c840e8bf6f92d8545ec0091d2b09c5273d/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java#L52-L54

night2201 commented 10 months ago

2023-09-08 09:50:44,137 INFO [io.deb.ser.DebeziumServer] (main) Received request to stop the engine 2023-09-08 09:50:44,138 INFO [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine 2023-09-08 09:50:44,164 INFO [io.quarkus] (main) debezium-server-iceberg-dist stopped in 0.046s


- I find that the test_user table is ingested but the student table is not yet. In log, 2023-09-08 09:50:44,021 WARN  [io.deb.ser.ice.IcebergUtil] (pool-6-thread-1) Table not found: default.dbz_mysql_ducdn_icebergg. 
- I think that it is trying to create a table default.dbz_mysql_ducdn_icebergg but it doesn't exist in mysql so it generates an error log `error = 'java.lang.RuntimeException: Complex nested array types are not supported, array[struct], field tableChanges': java.lang.RuntimeException: Complex nested array types are not supported, array[struct], field tableChanges`
- Now, I still don't know how to solve this problem. Can you give me some suggestions??
- Besides, I have an adding question. Does debezium server support Oracle and MySQL?. I don't see any mention in the manual.
ismailsimsek commented 10 months ago

@night2201 correct its failing while creating default.dbz_mysql_ducdn_icebergg table. because this table has complex type field: tableChanges (nested array type) this type is currently not supported by the consumer.

could you share DDL of this table/field? for the future reference in case someone wants to work on adding support for this type.

yes Oracle and MySQL is supported. pretty much all debezium connectors are supported

night2201 commented 10 months ago

@ismailsimsek, my two tables don't have complex type and nested array but I don't understand why it shows the above error log. image image

ismailsimsek commented 10 months ago

@night2201 is this part of the config? event flattening

# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
night2201 commented 10 months ago

Yes, I added the above configs. Besides, I tried to push data to Kafka and I saw that events are flattern. This is application.properties

# Use iceberg sink
debezium.sink.type=iceberg

# Iceberg sink config
debezium.sink.iceberg.table-prefix=dbz_mysql_
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=false
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=iceberg

# hive meatastore catalogs
debezium.sink.iceberg.type=hive
debezium.sink.iceberg.uri=thrift://xx.x.x.x:9083
debezium.sink.iceberg.clients=5
debezium.sink.iceberg.warehouse=s3a://datalake
debezium.sink.iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.engine.hive.enabled=true
debezium.sink.iceberg.iceberg.engine.hive.enabled=true
debezium.sink.iceberg.hive.metastore.table.owner=admin
debezium.sink.iceberg.hive.other.configs=admin

# S3 config
debezium.sink.iceberg.fs.defaultFS=s3a://datalake
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true
debezium.sink.iceberg.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
debezium.sink.iceberg.fs.s3a.access.key=minioadmin
debezium.sink.iceberg.fs.s3a.secret.key=minioadmin
debezium.sink.iceberg.fs.s3a.endpoint=http://xx.xx.x.x:9003
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

# enable event schemas - mandate
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# mysql
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=/tmp/offset1
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=xx.x.x.xx
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=root
#debezium.source.database.dbname=mydb
debezium.source.database.server.name=mysql_cdc
debezium.source.database.include.list=mydb
debezium.source.topic.prefix=ducdn_icebergg
debezium.source.database.server.id=184054
debezium.source.schema.history.internal.kafka.bootstrap.servers=broker:29092
debezium.source.schema.history.internal.kafka.topic=ducdn_schema_changes.mydb
debezium.source.include.schema.changes=true

# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET INTERVAL TIME ############
debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
#debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc
#debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.max.batch.size=10000
debezium.source.max.queue.size=100000
debezium.sink.batch.batch-size-wait.max-wait-ms=120000
debezium.sink.batch.batch-size-wait.wait-interval-ms=10000

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
quarkus.log.console.json=false
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
quarkus.log.category."io.debezium.server.iceberg.batchsizewait".level=DEBUG
ismailsimsek commented 10 months ago

found the issue, this one is causing the error debezium.source.include.schema.changes=true could you please try it with false?

now latest release is supporting complex types, if you use latest version you should not run into this issue.

night2201 commented 10 months ago

@ismailsimsek I set debezium.source.include.schema.changes=false and resolved my problem. But when set debezium.source.include.schema.changes=false, when the schema of tables in MySQL changes, debezium don't capture this changes but it only captures insert update delete operations I would like to ask if debezium server iceberg currently supports data partitioning? Thank you very much for your help, have a nice day!!

ismailsimsek commented 10 months ago

1) schema changes are applied using event schema. field additions are added automatically. 2) data partitioning can be added to destination iceberg table manually. debezium server iceberg consumer is not adding any partition.

schema changes: https://github.com/memiiso/debezium-server-iceberg/blob/abcf2a8f93973e39db089cfbc78b55920868ff46/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java#L111-L123

night2201 commented 9 months ago

@ismailsimsek Thanks a lot for your help!!