memiiso / debezium-server-iceberg

Replicates any database (CDC events) to Apache Iceberg (To Cloud Storage)
Apache License 2.0
200 stars 36 forks source link

not able get records from the table. #341

Open mevadadhruv opened 5 months ago

mevadadhruv commented 5 months ago

hi @ismailsimsek , so we are using debezium as a part of our architecture where we are using Oracle db as a source and destination as a dremio. in that we are trying to access the initial and incremental load of the table from the database through CDC events. the process which we are following is oracle -> aws iceberg tables -> aws s3 bucket (where data and metadata are stored) -> dremio (configured iceberg tables and s3 bucket), for this we have provided parameters (application.properties) for which I have attached a file for better understanding. now the problem which we are facing is that, we are not able to get any of the data from the table. we are just getting the metadata of that in dremio and in dremio I have configured both still i'm not able to get data from any of it. application_properties.txt image

ismailsimsek commented 5 months ago

Are you able to read the table using AWS glue? Could you compare the glue table path and S3 path? Recently there were issue on that #320

mevadadhruv commented 5 months ago

hi @ismailsimsek , thanks for quick response, but we are getting data as above image, but we don't want that way, we expect data as a below attachment. Thank you, Dhruv image

ismailsimsek commented 5 months ago

@mevadadhruv please check following configs. make sure include exclude list settings are containing the schema you want to replicate.

debezium.source.table.include.list=schemaname.tablename

the table you are querying is just additional metadata table created by following config. since you are able to query it that means everything is working as expected.

debezium.source.include.schema.changes=true
mevadadhruv commented 5 months ago

hi @ismailsimsek , if you could please check our application.properties, in that you will find out that we already provided that parameters. Thank you, Dhruv Mevada

ismailsimsek commented 5 months ago

@mevadadhruv please check the value you are using schemaname.tablename. i recommend trying with regexp *

mevadadhruv commented 5 months ago

hi @ismailsimsek, yah we tried actually this before but, that's not our problem, our actual problem is that we are not getting any records of table once we start the debezium server but we are getting the data and metadata folder in the s3 bucket but in that we are also getting schemas and logs of it, not the records of the table(please finds attachments for better understanding).

image Thank you, Dhruv Mevada

ismailsimsek commented 5 months ago

@mevadadhruv could you attach debezium logs too?

GOVINDARAMTEKKAR97 commented 5 months ago

Hi @ismailsimsek , I'm working with @mevadadhruv on this part, the issue we are facing is that, oracle databases contain n number of table and we are trying to get data for particular table, we are getting the schema for the table but not the column and row field , it giving entire schema,

In postgres if we made anything update delete or insert we are getting data for that table https://github.com/memiiso/debezium-server-iceberg/issues/341#issuecomment-2157971128 like that but in oracle we are getting entire schema not the rows and column for that tables [ { "id": "e1aa0a60-7e65-4aa8-b647-c1eafd40efed", "history_data": "{\"source\":{\"server\":\"tutorial\"},\"position\":{\"snapshot_scn\":\"551361732\",\"snapshot\":true,\"scn\":\"551361732\",\"snapshot_completed\":false},\"ts_ms\":1718010053332,\"databaseName\":\"AMSGCPYPDB1\",\"schemaName\":\"AMOS_PRIV_PRD_1\",\"ddl\":\"\n CREATE TABLE \\"AMOS_PRIV_PRD_1\\".\\"PART\\" \n (\t\\"EXTENDED_STATENO_I\\" NUMBER(12,0), \n\t\\"PARTNO\\" VARCHAR2(32 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"PARTMATCH\\" VARCHAR2(32 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"PARTSEQNO_I\\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\\"DESCRIPTION\\" VARCHAR2(36 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"REMARKS\\" VARCHAR2(36 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"SPECIFICATION\\" VARCHAR2(50 CHAR), \n\t\\"ATA_CHAPTER\\" VARCHAR2(12 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"VENDOR\\" VARCHAR2(12 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"WEIGHT\\" BINARY_DOUBLE DEFAULT 0 NOT NULL ENABLE, \n\t\\"STORETIME\\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\\"ALERT_QTY\\" BINARY_DOUBLE DEFAULT 0 NOT NULL ENABLE, \n\t\\"MEASURE_UNIT\\" VARCHAR2(4 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"WASTE_CODE\\" VARCHAR2(8 CHAR) DEFAULT ' ', \n\t\\"REORD_LEVEL\\" BINARY_DOUBLE DEFAULT 0 NOT NULL ENABLE, \n\t\\"SAFETY_STOCK\\" BINARY_DOUBLE, \n\t\\"MAX_PURCH\\" BINARY_DOUBLE DEFAULT 0 NOT NULL ENABLE, \n\t\\"AC_TYP\\" VARCHAR2(6 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"MAT_CLASS\\" VARCHAR2(4 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"MAT_TYPE\\" VARCHAR2(4 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"COUNTRY_ORIGIN\\" VARCHAR2(4 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"REORDER\\" VARCHAR2(2 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"TOOL\\" VARCHAR2(1 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"REPAIRABLE\\" VARCHAR2(1 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"AVG_TA_TIME\\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\\"DEFAULT_SUPPLIER\\" VARCHAR2(12 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"DEFAULT_REPAIR\\" VARCHAR2(12 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"SPECIAL_CONTRACT\\" VARCHAR2(2 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"FIXED_ASSET\\" VARCHAR2(1 CHAR) DEFAULT 'N' NOT NULL ENABLE, \n\t\\"REORDER_LAST_MUTATOR\\" VARCHAR2(8 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"REORDER_LAST_MUTATION\\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\\"MAX_SHOP_VISIT\\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\\"SHOP_VISIT_RESET_CONDITION\\" VARCHAR2(8 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"SPECIAL_MEASURE_UNIT\\" VARCHAR2(4 CHAR) DEFAULT ' ', \n\t\\"MANUFACTURER\\" NUMBER(12,0), \n\t\\"PMA\\" VARCHAR2(1 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"RESOURCE_TYPE_ID\\" VARCHAR2(20 CHAR), \n\t\\"COUNTER_TEMPLATE_GROUPNO_I\\" NUMBER(12,0), \n\t\\"MUTATION\\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\\"MUTATOR\\" VARCHAR2(8 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\\"STATUS\\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\\"MUTATION_TIME\\" NUMBER(12,0), \n\t\\"CREATED_BY\\" VARCHAR2(8 CHAR), \n\t\\"CREATED_DATE\\" NUMBER(12,0), \n\t\\"MANUAL_SAFETY_STOCK\\" VARCHAR2(10 CHAR) DEFAULT NULL, \n\t\\"MIN_SAFETY_STOCK\\" BINARY_DOUBLE DEFAULT NULL, \n\t CONSTRAINT \\"U_16020\\" UNIQUE (\\"PARTSEQNO_I\\")\n USING INDEX ENABLE, \n\t CONSTRAINT \\"FK_16026\\" FOREIGN KEY (\\"AC_TYP\\")\n\t REFERENCES \\"AMOS_PRIV_PRD_1\\".\\"AC_TYP\\" (\\"AC_TYP\\") ENABLE NOVALIDATE\n ) ;\n CREATE INDEX \\"AMOS_PRIV_PRD_1\\".\\"I_102365\\" ON \\"AMOS_PRIV_PRD_1\\".\\"PART\\" (\\"PARTNO\\", \\"MUTATION\\", \\"MUTATION_TIME\\") \n ;\nALTER TABLE \\"AMOS_PRIV_PRD_1\\".\\"PART\\" ADD CONSTRAINT \\"PK_16019\\" PRIMARY KEY (\\"PARTNO\\")\n USING INDEX \\"AMOS_PRIV_PRD_1\\".\\"I_102365\\" ENABLE;\",\"tableChanges\":[{\"type\":\"CREATE\",\"id\":\"\\"AMSGCPYPDB1\\".\\"AMOS_PRIV_PRD_1\\".\\"PART\\"\",\"table\":{\"defaultCharsetName\":null,\"primaryKeyColumnNames\":[\"PARTNO\"],\"columns\":[{\"name\":\"EXTENDED_STATENO_I\",\"jdbcType\":2,\"typeName\":\"NUMBER\",\"typeExpression\":\"NUMBER\",\"charsetName\":null,\"length\":12,\"scale\":0,\"position\":1,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"PARTNO\",\"jdbcType\":12,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":32,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"' ' \",\"enumValues\":[]},{\"name\":\"PARTMATCH\",\"jdbcType\":12,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":32,\"position\":3,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"' ' \",\"enumValues\":[]}, sample data we are getiing

ismailsimsek commented 5 months ago

@GOVINDARAMTEKKAR97 i recommend checking debezium-server logs, then you can see what is actually happening. having schema changes doesn't means debezium is replicating the table.

GOVINDARAMTEKKAR97 commented 5 months ago

Hi @ismailsimsek ok if I do ./run.sh i will showing the process which parameter need to set on application.properties so I can see details log, can you provide me please.

Oracle connector configuration

debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=localhost debezium.source.database.port=1521 debezium.source.database.user=user debezium.source.database.password=password$1234 debezium.source.database.dbname=db debezium.source.database.pdb.name=pdb debezium.source.table.include.list=schemaname.tablename snapshot.include.collection.list=schemaname.tablename

debezium.source.schema.include.list=schemaname

debezium.source.database.connection.adapter=logminer

debezium.source.snapshot.mode=schema_only

debezium.source.database.schema=

#################################################################################

debezium.source.include.schema.changes=true

Uncomment if using a PDB

debezium.source.topic.prefix=tutorial

If you have a specific start SCN, set it here. If not, let it auto-generate.

debezium.source.start.scn=default

Uncomment if you have a specific end SCN, otherwise, omit it

debezium.source.end.scn=null

Optional table filters

debezium.source.table.include.list=your-schema.your-table

debezium.source.include.schema.changes=true

debezium.source.replica.identity.autoset.values=lot:FULL

##########################################################################################

Sink type configuration

debezium.sink.type=iceberg debezium.sink.iceberg.table-prefix=debeziumcdc_ debezium.sink.iceberg.upsert=false debezium.sink.iceberg.upsert-keep-deletes=false debezium.sink.iceberg.write.format.default=parquet debezium.sink.iceberg.catalog-name=iceberg

debezium.sink.iceberg.table-name=tablename

############################################################################################

Enable event schemas

debezium.format.value.schemas.enable=true debezium.format.key.schemas.enable=true debezium.format.value=json debezium.format.key=json

###############################################################################################

Unwrap messages

debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db debezium.transforms.unwrap.delete.handling.mode=rewrite debezium.transforms.unwrap.drop.tombstones=true

############################################################################################

Logging configuration

quarkus.log.level=INFO quarkus.log.console.json=false quarkus.log.category."org.apache.hadoop".level=WARN quarkus.log.category."org.apache.parquet".level=WARN quarkus.log.category."org.eclipse.jetty".level=WARN

##############################################################################################

debezium.source.log.mining.scn.gap.detection.gap.size.min=10000 debezium.source.log.mining.scn.gap.detection.time.interval.max.ms=300000

Advanced consuming configuration

debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore

debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table

debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_test

##################################################################################################

Batch configuration

debezium.sink.batch.batch-size-wait=MaxBatchSizeWait debezium.sink.batch.batch-size-wait.max-wait-ms=180000 debezium.sink.batch.batch-size-wait.wait-interval-ms=120000 debezium.sink.batch.metrics.snapshot-mbean=debezium.oracle:type=connector-metrics,context=snapshot,server=testc debezium.sink.batch.metrics.streaming-mbean=debezium.oracle:type=connector-metrics,context=streaming,server=testc debezium.source.max.batch.size=15 debezium.source.max.queue.size=45

S3 File IO configuration

debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO debezium.sink.iceberg.s3.access-key-id= debezium.sink.iceberg.s3.secret-access-key= debezium.sink.iceberg.warehouse=s3://path debezium.sink.iceberg.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog AWS_REGION=eu-west-1

logging.level.io.debezium=DEBUG logging.level.org.apache.iceberg=DEBUG

we are using this things in applicaion.properties file

mevadadhruv commented 5 months ago

hello @ismailsimsek Good Morning, and also like if possible than can we connect for this through ZULIP or any bridge so you can get better understanding on this.