memiiso / debezium-server-iceberg

Replicates any database (CDC events) to Apache Iceberg (To Cloud Storage)
Apache License 2.0
171 stars 35 forks source link

Glue table can not able to access bucket objects #320

Open FenilJain2301 opened 1 month ago

FenilJain2301 commented 1 month ago

Hi @ismailsimsek, We are successfully able to create iceberg tables data and successfully push to the s3 bucket, But we are facing the issue that when the data is pushed to aws glue in the form of tables it is not accessible because it is taking only object URI and not object path with bucket URI. Suppose I give the endpoint of the bucket or bucket name "test" and debezium.sink.iceberg.warehouse=s3://test/warehouse, it should create a data in test bucket within the warehouse object. the test will be the bucket name and warehouse will be the bucket object and that path for the aws glue table "s3://test/warehouse" but what happened is it is creating a test object within that object it is creating warehouse means in bucket path, the path of data will be "test/test/warehouse" and glue data table path should be "s3://test/test/warehouse".

Problem is that the bucket path is "test/test/warehouse" and the glue path is for tables it's "s3://test/warehouse". So when we try to access object from the aws glue table, it is showing error: object is not found. We will share the application.properties file parameter below:

application.properties:

#postgres
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=root@123
debezium.source.database.dbname=dremio
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=public

debezium.sink.type=iceberg

# Iceberg sink config
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=false
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=iceberg

# enable event schemas - mandatory

debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
quarkus.log.console.json=false
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

# see https://debezium.io/documentation/reference/stable/development/engine.html#advanced-consuming
debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table
# see https://debezium.io/documentation/reference/stable/development/engine.html#database-history-properties
debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_test

#######
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.value=json

# complex nested data types are not supported, do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

##################
debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
debezium.sink.batch.batch-size-wait.max-wait-ms=180000
debezium.sink.batch.batch-size-wait.wait-interval-ms=120000
debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc
debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc

# increase max.batch.size to receive large number of events per batch
debezium.source.max.batch.size=15
debezium.source.max.queue.size=45

debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.s3.endpoint=https://test.s3.amazonaws.com
debezium.sink.iceberg.s3.path-style-access=true
debezium.sink.iceberg.s3.access-key-id=.................
debezium.sink.iceberg.s3.secret-access-key=................
debezium.sink.iceberg.warehouse=s3://test/warehouse
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog

From github document, we think that it should take the bucket name and just create the object , but it creating the object under the object and AWS glue only considers s3a:// path only

Bucket data path. test/test/warehouse AWS glue Table path >> s3a://test/warehouse so its not able to find objects in bucket.

image (8) image (9)

bucket URI: s3://dremio-virginia/test/default.db/debeziumcdc_tutorial_public_test AWS glue table uri for accessing bucket data :[s3://test/default.db/debeziumcdc_tutorial_public_test]

we attached the screenshots for your better understanding. Please revert us waiting for your reply.

Thanks, Mr. Fenil Jain

ismailsimsek commented 1 month ago

@FenilJain2301 dont see why the extra test/ is added to the path

looking at the examples in iceberg documentation following two settings should not be necessary. could you remove them and try again? especially the first one.

debezium.sink.iceberg.s3.endpoint=https://test.s3.amazonaws.com
debezium.sink.iceberg.s3.path-style-access=true
ismailsimsek commented 1 month ago

And what happens when you provide the warehouse name like below? does it returns data when you manually fix the table location?

debezium.sink.iceberg.warehouse=s3://test/test/warehouse