apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.41k stars 2.43k forks source link

[SUPPORT] HiveSyncTool failure - Unable to create a `_ro` table when writing data #11254

Closed shubhamn21 closed 5 months ago

shubhamn21 commented 5 months ago

Describe the problem you faced

Unable to write a hudi table to aws hadoop emr setup. From the error it seems that it is failing while creating a metadata table (with suffix _ro) with hive/glue. Am I missing a setting with hive to allow it create Null type tables? Are there alternative solutions?

To Reproduce

Steps to reproduce the behavior:

  1. While writing data: df.write.format("hudi")\ .mode('append') \ .options(**options)\ .partitionBy("kafka_topic", "event_dt") \ .saveAsTable('db_name.snimbalkar_test_table')

Expected behavior

Creates and stores table.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace


: org.apache.hudi.exception.HoodieException: Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool
    at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:61)
    at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2(HoodieSparkSqlWriter.scala:888)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:886)
    at org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:826)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:322)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:107)
    at org.apache.spark.sql.hudi.command.CreateHoodieTableAsSelectCommand.run(CreateHoodieTableAsSelectCommand.scala:106)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
    at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:701)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:679)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:573)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: Got runtime exception when hive syncing snimbalkar_test_table
    at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:165)
    at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:59)
    ... 53 more
Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to get table schema for : snimbalkar_test_table_ro
    at org.apache.hudi.hive.ddl.HMSDDLExecutor.getTableSchema(HMSDDLExecutor.java:194)
    at org.apache.hudi.hive.HoodieHiveSyncClient.getMetastoreSchema(HoodieHiveSyncClient.java:212)
    at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:334)
    at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:252)
    at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:188)
    at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:162)
    ... 54 more
Caused by: InvalidObjectException(message:TableType cannot be null for table: snimbalkar_test_table_ro (Service: null; Status Code: 0; Error Code: null; Request ID: null; Proxy: null))
    at com.amazonaws.glue.catalog.converters.BaseCatalogToHiveConverter$5.get(BaseCatalogToHiveConverter.java:70)
    at com.amazonaws.glue.catalog.converters.BaseCatalogToHiveConverter.getHiveException(BaseCatalogToHiveConverter.java:109)
    at com.amazonaws.glue.catalog.converters.BaseCatalogToHiveConverter.wrapInHiveException(BaseCatalogToHiveConverter.java:100)
    at com.amazonaws.glue.catalog.metastore.GlueMetastoreClientDelegate.getTable(GlueMetastoreClientDelegate.java:450)
    at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.getTable(AWSCatalogMetastoreClient.java:1008)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2350)
    at com.sun.proxy.$Proxy85.getTable(Unknown Source)
    at org.apache.hudi.hive.ddl.HMSDDLExecutor.getTableSchema(HMSDDLExecutor.java:180)
    ... 59 more```
ad1happy2go commented 5 months ago

@shubhamn21 Please Provide Writer configurations.

shubhamn21 commented 5 months ago

Here it is:


options = {
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
    "hoodie.datasource.write.operation": "insert",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.update.partial.fields": "true",
    "hoodie.upsert.shuffle.parallelism": "2",
    "hoodie.insert.shuffle.parallelism": "2",
    "hoodie.index.bloom.num_entries": "60000",
    "hoodie.index.bloom.fpp": "0.000000001",
    "hoodie.compaction.lazy.block.read": "false",
    "hoodie.enable.data.skipping": "true",
    "hoodie.logfile.max.size": "1073741824",
    "hoodie.parquet.small.file.limit": "104857600",
    "hoodie.parquet.max.file.size": "125829120",
    "hoodie.parquet.block.size": "125829120",
    "hoodie.clean.automatic": "false",
    "hoodie.clean.async": "true",
    "hoodie.datasource.write.precombine.field":"kafka_offset",
    "hoodie.datasource.write.recordkey.field":"id,cid",
    "hoodie.datasource.write.partitionpath.field":"kafka_topic,event_dt",
    "hoodie.datasource.hive_sync.enable": "true",
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.mode': 'hms',
    'hoodie.datasource.hive_sync.partition_extractor_class': "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.table.name": "snimbalkar_test_table_ro"
}
shubhamn21 commented 5 months ago

I think it may have to do something with AWSGlue compatibility. The documentation said that it is only support upto hudi 0.12.1.

As a workaround - I am using .save instead of .saveAsTable. I am not able to sync with glue/hive but able to I am able to ingest data and query with spark-sql.

shubhamn21 commented 5 months ago

Closing this as it is no longer an issue.