memiiso / debezium-server-iceberg

Replicates any database (CDC events) to Apache Iceberg (To Cloud Storage)
Apache License 2.0
185 stars 35 forks source link

Unable to run icebergevents mode in Append with Hive + MinIO #326

Closed AbhishekGedela closed 4 months ago

AbhishekGedela commented 4 months ago

Hi @ismailsimsek

I am facing the below error when I try to run the server in debezium.sink.type icebergevents.

2024-05-23 03:29:08,200 ERROR [org.apa.had.hiv.met.ObjectStore] (pool-13-thread-1) Error loading PartitionExpressionProxy: MetaException(message:org.apache.hadoop.hive.ql.optimizer.ppr.PartitionExpressionForMetastore class not found)
    at org.apache.hadoop.hive.metastore.utils.JavaUtils.getClass(JavaUtils.java:54)
    at org.apache.hadoop.hive.metastore.ObjectStore.createExpressionProxy(ObjectStore.java:538)
    at org.apache.hadoop.hive.metastore.ObjectStore.initializeHelper(ObjectStore.java:495)
    at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:421)
    at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:376)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:79)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:140)
    at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:59)
    at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStoreForConf(HiveMetaStore.java:720)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMSForConf(HiveMetaStore.java:698)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:692)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:769)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:540)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:147)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:108)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:80)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:93)
    at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:8678)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:169)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
    at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:95)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:119)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:112)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:60)
    at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:72)
    at org.apache.iceberg.common.DynMethods$StaticMethod.invoke(DynMethods.java:185)
    at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:63)
    at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:34)
    at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125)
    at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56)
    at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
    at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
    at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:146)
    at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
    at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
    at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:49)
    at io.debezium.server.iceberg.history.IcebergSchemaHistory.storageExists(IcebergSchemaHistory.java:205)
    at io.debezium.server.iceberg.history.IcebergSchemaHistory.lambda$start$0(IcebergSchemaHistory.java:115)
    at io.debezium.util.FunctionalReadWriteLock.write(FunctionalReadWriteLock.java:99)
    at io.debezium.server.iceberg.history.IcebergSchemaHistory.start(IcebergSchemaHistory.java:112)
    at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:51)
    at io.debezium.connector.binlog.BinlogDatabaseSchema.<init>(BinlogDatabaseSchema.java:79)
    at io.debezium.connector.mysql.MySqlDatabaseSchema.<init>(MySqlDatabaseSchema.java:40)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:99)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:241)
    at io.debezium.embedded.async.AsyncEmbeddedEngine.lambda$startSourceTasks$2(AsyncEmbeddedEngine.java:400)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

2024-05-23 03:29:08,203 DEBUG [org.apa.had.hiv.met.ObjectStore] (pool-13-thread-1) Non-retriable exception during ObjectStore initialize.: java.lang.RuntimeException: Error loading PartitionExpressionProxy: org.apache.hadoop.hive.ql.optimizer.ppr.PartitionExpressionForMetastore class not found
    at org.apache.hadoop.hive.metastore.ObjectStore.createExpressionProxy(ObjectStore.java:542)
    at org.apache.hadoop.hive.metastore.ObjectStore.initializeHelper(ObjectStore.java:495)
    at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:421)
    at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:376)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:79)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:140)
    at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:59)
    at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStoreForConf(HiveMetaStore.java:720)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMSForConf(HiveMetaStore.java:698)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:692)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:769)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:540)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:147)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:108)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:80)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:93)
    at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:8678)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:169)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
    at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:95)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:119)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:112)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:60)
    at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:72)
    at org.apache.iceberg.common.DynMethods$StaticMethod.invoke(DynMethods.java:185)
    at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:63)
    at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:34)
    at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125)
    at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56)
    at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
    at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
    at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:146)
    at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
    at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
    at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:49)
    at io.debezium.server.iceberg.history.IcebergSchemaHistory.storageExists(IcebergSchemaHistory.java:205)
    at io.debezium.server.iceberg.history.IcebergSchemaHistory.lambda$start$0(IcebergSchemaHistory.java:115)
    at io.debezium.util.FunctionalReadWriteLock.write(FunctionalReadWriteLock.java:99)
    at io.debezium.server.iceberg.history.IcebergSchemaHistory.start(IcebergSchemaHistory.java:112)
    at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:51)
    at io.debezium.connector.binlog.BinlogDatabaseSchema.<init>(BinlogDatabaseSchema.java:79)
    at io.debezium.connector.mysql.MySqlDatabaseSchema.<init>(MySqlDatabaseSchema.java:40)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:99)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:241)
    at io.debezium.embedded.async.AsyncEmbeddedEngine.lambda$startSourceTasks$2(AsyncEmbeddedEngine.java:400)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)  

I tried debugging by adding print statements in IcebergSchemaHistory class. Compared it with debezium.sink.type iceberg. I can see that **schema.history.internal.iceberg.*** are not being set in icebergevents mode.

================iceberg conf=====================
HiveCatalog{name=default, uri=null}
=================================================
2024-05-23 03:40:03,520 INFO  [io.deb.ser.ice.his.IcebergSchemaHistory] (pool-13-thread-1) Starting IcebergSchemaHistory storage table:default.tpch_raw_debezium_database_history_storage_table
2024-05-23 03:40:03,537 DEBUG [org.apa.had.hiv.con.HiveConf] (pool-13-thread-1) Found metastore URI of null

I tried it in both master and v0.4.0.Final versions.

Can you please help me with this?

Posting my application.properties file below:

# Use iceberg sink
debezium.sink.type=icebergevents
debezium.sink.iceberg.catalog-name=default

# Iceberg sink config
debezium.sink.iceberg.table-prefix=tpch_raw_
debezium.sink.iceberg.upsert=false
debezium.sink.iceberg.write.format.default=parquet

# Config with hive meatastore catalogs
debezium.sink.iceberg.type=hive
debezium.sink.iceberg.uri=thrift://127.0.0.1:9083
debezium.sink.iceberg.clients=5
debezium.sink.iceberg.warehouse=s3a://datalake/
debezium.sink.iceberg.engine.hive.enabled=true
debezium.sink.iceberg.s3.endpoint=http://localhost:9000
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.s3.access-key-id=
debezium.sink.iceberg.s3.secret-access-key=
debezium.sink.iceberg.s3.path-style-access=true

# enable event schemas - mandatory
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-name=tpch_raw_debezium_offset_storage_table
debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.schema.history.internal.iceberg.table-name=tpch_raw_debezium_database_history_storage_table

# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=6603
debezium.source.database.user=
debezium.source.database.password=
debezium.source.database.dbname=
debezium.table.include.list=
debezium.source.database.server.name=test
debezium.source.database.server.id=1234
debezium.source.topic.prefix=dbz_

quarkus.http.port=8088
quarkus.log.level=TRACE
quarkus.log.console.json=false
# set log level for libs
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=DEBUG
quarkus.log.category."org.apache.parquet".level=DEBUG
# Ignore messages below warning level from Jetty, because it's a bit verbose

debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
debezium.source.max.batch.size=2048
debezium.source.max.queue.size=16000
debezium.sink.batch.batch-size-wait.max-wait-ms=30000
debezium.sink.batch.batch-size-wait.wait-interval-ms=5000
AbhishekGedela commented 4 months ago

Could it be because Named("icebergevents") and the properties should be **debezium.sink.icebergevents.*** ?

It seems to be working once I interchanged Named("iceberg") in IcebergEventsChangeConsumer and IcebergChangeConsumer.

ismailsimsek commented 4 months ago

this seems like Hive catalog related issue. could you add hive-exec library to he libs and try ? https://mvnrepository.com/artifact/org.apache.hive/hive-exec

AbhishekGedela commented 4 months ago

Thanks for responding! I will try that and get back with the findings. Is the hive-exec library dependency explicitly needed for icebergevents sink type? Once I renamed icebegevents to iceberg it seems to be working fine.

ismailsimsek commented 4 months ago

we do have 2 consumers in this project icebegevents and iceberg. icebegevents appends all the events to a partitioned table. only difference is icebegevents is using hourly partitioned table by default. other than that i dont see any difference which could cause the error.

it looks like hive related. when it tries to read history table.

could you instead try to use file as a history table

debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.iceberg.table-name=/path/to/storage/schemahistory.dat

example here

ismailsimsek commented 4 months ago

Is the hive-exec library dependency explicitly needed for icebergevents sink type? Once I renamed icebegevents to iceberg it seems to be working fine.

no its not needed explicitly for icebergevents . it should be same for both consumers. including

Could it be because Named("icebergevents") and the properties should be debezium.sink.icebergevents.* ?

No because property names are hard coded and independent of consumer name. will check this offset storage might be doing it for the parametters

ismailsimsek commented 4 months ago

@AbhishekGedela its related java version, could you please make sure you are running it with java 17 not above.

Edit: with java 21 backed is not receiving any config. but with java 17 all the configuration is recognized(they are set) correctly in the application

Edit: please ignore, still investigating...

ismailsimsek commented 4 months ago

Could it be because Named("icebergevents") and the properties should be debezium.sink.icebergevents.* ?

No because property names are hard coded and independent of consumer name. will check this offset storage might be doing it for the parametters

looks like it. looks like when debezium passing down the config to SchemaHistory class somehow its filtering configs out using consumer name.

AbhishekGedela commented 4 months ago

Alright, Thanks a lot for confirming! I will raise a PR to add this in the Docs. Please approve that.

ismailsimsek commented 4 months ago

Thank you @AbhishekGedela for reporting this. now it fixed with the latest release, please feel free to open new issue if it still not working

AbhishekGedela commented 4 months ago

Thanks for fixing this in such short time!