memiiso / debezium-server-iceberg

Replicates any database (CDC events) to Apache Iceberg (To Cloud Storage)
Apache License 2.0
174 stars 35 forks source link

can't see the table from PostgreSQL integrated #221

Open duc-dn opened 12 months ago

duc-dn commented 12 months ago

I followed the tutorial, using debezium-server-iceberg to read data from postgres and saved to Iceberg format. However, in the log, I don't see ingestion taking place and I don't any error in the log. In minio, I don't see the data saved as iceberg table. This a is config file

# Use iceberg sink
debezium.sink.type=iceberg

# Iceberg sink config
debezium.sink.iceberg.table-prefix=debeziumcdc
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=iceberg

debezium.sink.iceberg.type=hive
debezium.sink.iceberg.uri=thrift://hive-metastore:9083
debezium.sink.iceberg.clients=5
debezium.sink.iceberg.warehouse=s3a://datalake/warehouse
debezium.sink.iceberg.engine.hive.enabled=true
### if above one doesn't work please try following config
debezium.sink.iceberg.iceberg.engine.hive.enabled=true

# S3 config
debezium.sink.iceberg.fs.defaultFS=s3a://datalake
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true
debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
debezium.sink.iceberg.fs.s3a.access.key=minioadmin
debezium.sink.iceberg.fs.s3a.secret.key=minioadmin
debezium.sink.iceberg.fs.s3a.endpoint=http://minio:9000
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

# enable event schemas - mandate
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# postgres source
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=postgres_cdc
debezium.source.schema.include.list=public.student
debezium.source.topic.prefix=ducdn
debezium.source.plugin.name=pgoutput

# sql server source
#debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
#debezium.source.offset.storage.file.filename=data/offsets.dat
#debezium.source.offset.flush.interval.ms=0
#debezium.source.database.hostname=localhost
#debezium.source.database.port=5432
#debezium.source.database.user=debezium
#debezium.source.database.password=debezium
#debezium.source.database.dbname=debezium
#debezium.source.database.server.name=tutorial
#debezium.source.schema.include.list=inventory
# mandate for sql server source, avoid error when snapshot and schema change
#debezium.source.include.schema.changes=false

# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
quarkus.log.console.json=false
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc
debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector

debezium.sink.batch.batch-size-wait.max-wait-ms=20000
debezium.sink.batch.batch-size-wait.wait-interval-ms=5000

Log:

2023-08-09 15:35:51,796 INFO  [io.deb.con.pos.PostgresSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) According to the connector configuration data will be snapshotted
2023-08-09 15:35:51,797 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Snapshot step 1 - Preparing
2023-08-09 15:35:51,804 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Snapshot step 2 - Determining captured tables
2023-08-09 15:35:51,812 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Adding table public.student to the list of capture schema tables
2023-08-09 15:35:51,820 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Snapshot step 3 - Locking captured tables []
2023-08-09 15:35:51,821 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Snapshot step 4 - Determining snapshot offset
2023-08-09 15:35:51,822 INFO  [io.deb.con.pos.PostgresSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Creating initial offset context
2023-08-09 15:35:51,826 INFO  [io.deb.con.pos.PostgresSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Read xlogStart at 'LSN{0/1566BC8}' from transaction '758'
2023-08-09 15:35:51,839 INFO  [io.deb.con.pos.PostgresSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Read xlogStart at 'LSN{0/1566BC8}' from transaction '758'
2023-08-09 15:35:51,840 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Snapshot step 5 - Reading structure of captured tables
2023-08-09 15:35:51,844 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Snapshot step 5.a - Creating connection pool
2023-08-09 15:35:51,844 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Created connection pool with 1 threads
2023-08-09 15:35:51,845 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Snapshot step 6 - Persisting schema history
2023-08-09 15:35:51,846 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Snapshot step 7 - Snapshotting data
2023-08-09 15:35:51,847 INFO  [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Creating snapshot worker pool with 1 worker thread(s)
2023-08-09 15:35:51,849 INFO  [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Snapshot - Final stage
2023-08-09 15:35:51,853 INFO  [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Snapshot completed
2023-08-09 15:35:51,928 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='ducdn'db='postgres', lsn=LSN{0/1566BC8}, txId=758, timestamp=2023-08-09T08:35:51.839Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=true, lastCompletelyProcessedLsn=null, lastCommitLsn=null, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]]
2023-08-09 15:35:51,928 WARN  [io.deb.rel.RelationalDatabaseSchema] (debezium-postgresconnector-ducdn-change-event-source-coordinator) After applying the include/exclude list filters, no changes will be captured. Please check your configuration!
2023-08-09 15:35:51,941 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Connected metrics set to 'true'
2023-08-09 15:35:51,947 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Starting streaming
2023-08-09 15:35:51,948 INFO  [io.deb.con.pos.PostgresStreamingChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Retrieved latest position from stored offset 'LSN{0/1566BC8}'
2023-08-09 15:35:51,950 INFO  [io.deb.con.pos.con.WalPositionLocator] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{0/1566BC8}'
2023-08-09 15:35:51,951 INFO  [io.deb.con.pos.con.PostgresReplicationConnection] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Initializing PgOutput logical decoder publication
2023-08-09 15:35:51,993 INFO  [io.deb.con.pos.con.PostgresConnection] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/15666E0}, catalogXmin=755]
2023-08-09 15:35:51,997 INFO  [io.deb.jdb.JdbcConnection] (pool-13-thread-1) Connection gracefully closed
2023-08-09 15:35:51,998 INFO  [io.deb.con.pos.con.PostgresReplicationConnection] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Seeking to LSN{0/1566BC8} on the replication slot with command SELECT pg_replication_slot_advance('debezium', '0/1566BC8')
2023-08-09 15:35:52,023 INFO  [io.deb.uti.Threads] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Requested thread factory for connector PostgresConnector, id = ducdn named = keep-alive
2023-08-09 15:35:52,024 INFO  [io.deb.uti.Threads] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Creating thread debezium-postgresconnector-ducdn-keep-alive
2023-08-09 15:35:52,031 INFO  [io.deb.con.pos.PostgresStreamingChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Searching for WAL resume position
2023-08-09 15:36:04,709 INFO  [io.deb.con.pos.con.WalPositionLocator] (debezium-postgresconnector-ducdn-change-event-source-coordinator) First LSN 'LSN{0/1566C28}' received
2023-08-09 15:36:04,710 INFO  [io.deb.con.pos.PostgresStreamingChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) WAL resume position 'LSN{0/1566C28}' discovered
2023-08-09 15:36:04,713 INFO  [io.deb.jdb.JdbcConnection] (pool-14-thread-1) Connection gracefully closed
2023-08-09 15:36:04,716 INFO  [io.deb.jdb.JdbcConnection] (pool-15-thread-1) Connection gracefully closed
2023-08-09 15:36:04,758 INFO  [io.deb.con.pos.con.PostgresReplicationConnection] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Initializing PgOutput logical decoder publication
2023-08-09 15:36:04,761 INFO  [io.deb.con.pos.con.PostgresReplicationConnection] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Seeking to LSN{0/1566BC8} on the replication slot with command SELECT pg_replication_slot_advance('debezium', '0/1566BC8')
2023-08-09 15:36:04,775 INFO  [io.deb.uti.Threads] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Requested thread factory for connector PostgresConnector, id = ducdn named = keep-alive
2023-08-09 15:36:04,776 INFO  [io.deb.uti.Threads] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Creating thread debezium-postgresconnector-ducdn-keep-alive
2023-08-09 15:36:04,777 INFO  [io.deb.con.pos.PostgresStreamingChangeEventSource] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Processing messages
2023-08-09 15:36:04,779 INFO  [io.deb.con.pos.con.WalPositionLocator] (debezium-postgresconnector-ducdn-change-event-source-coordinator) Message with LSN 'LSN{0/1566C28}' arrived, switching off the filtering
ismailsimsek commented 12 months ago

seems like its not capturing any table. could you try following setting? is your database named postgres?

debezium.source.database.dbname=postgres
debezium.source.schema.include.list=public
duc-dn commented 12 months ago

@ismailsimsek, thanks. I tried to your config the above with Hadoop catalog and saved data in local and it works. However, I used the hive metastore catalog, I got an error

2023-08-10 09:26:54,123 ERROR [io.deb.ser.ConnectorLifecycle] (pool-6-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Failed to create file: file:/user/hive/warehouse/test_ducdn_public_student/metadata/00000-d043d821-4ad7-4e28-9400-466953e631dd.metadata.json', error = 'org.apache.iceberg.exceptions.RuntimeIOException: Failed to create file: file:/user/hive/warehouse/test_ducdn_public_student/metadata/00000-d043d821-4ad7-4e28-9400-466953e631dd.metadata.json': org.apache.iceberg.exceptions.RuntimeIOException: Failed to create file: file:/user/hive/warehouse/test_ducdn_public_student/metadata/00000-d043d821-4ad7-4e28-9400-466953e631dd.metadata.json
        at org.apache.iceberg.hadoop.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:87)
        at org.apache.iceberg.TableMetadataParser.internalWrite(TableMetadataParser.java:124)
        at org.apache.iceberg.TableMetadataParser.overwrite(TableMetadataParser.java:114)
        at org.apache.iceberg.BaseMetastoreTableOperations.writeNewMetadata(BaseMetastoreTableOperations.java:170)
        at org.apache.iceberg.BaseMetastoreTableOperations.writeNewMetadataIfRequired(BaseMetastoreTableOperations.java:160)
        at org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:185)
        at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:135)
        at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:199)
        at io.debezium.server.iceberg.IcebergUtil.createIcebergTable(IcebergUtil.java:109)
        at io.debezium.server.iceberg.IcebergChangeConsumer.lambda$loadIcebergTable$1(IcebergChangeConsumer.java:192)
        at java.base/java.util.Optional.orElseGet(Optional.java:369)
        at io.debezium.server.iceberg.IcebergChangeConsumer.loadIcebergTable(IcebergChangeConsumer.java:188)
        at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:166)
        at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:101)
        at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:912)
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:229)
        at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Mkdirs failed to create file:/user/hive/warehouse/test_ducdn_public_student/metadata (exists=false, cwd=file:/home/ducdn/Desktop/workspace/debezium-server-iceberg-dist-0.3.0-SNAPSHOT/debezium-server-iceberg)
        at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
        at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
        at org.apache.iceberg.hadoop.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:85)
        ... 19 more
ismailsimsek commented 12 months ago

It might not be connecting to Mino and hive server.

You could check the hostnames. If you are using docker containers, hostnames might be localhost and random port. http://minio:9000 and thrift://hive-metastore:9083

duc-dn commented 11 months ago

@ismailsimsek, I am running containers in docker compose and I tried to use localhost, minio, and hive-metastore hosts. In addition, I fixed the host in /etc/hosts image However, I got an error as the above error

ismailsimsek commented 11 months ago

could you share your docker compose code? another thing to look into is hive and minio integration, metastore-site.xml settings. leaving here one example