apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT]cant sync to correct hive schema #5693

Open sunke38 opened 2 years ago

sunke38 commented 2 years ago

Describe the problem you faced

I use spark sql to create table and sync to hive and query by presto. My code get table schema from json and create table if its not exists. Here is my problem, even I set hoodie.datasource.hive_sync.database in sql, the new table is put under default database instead of I set.

code

  def main(args: Array[String]): Unit = {
    implicit val sparkSession = SparkSession.builder()
      .appName("****")
      .master("spark://***:7077")
      .config("spark.sql.warehouse.dir","***/warehouse")
      .config("spark.serialize","org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.extensions","org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
      .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.hudi.catalog.HoodieCatalog")
      .config("spark.sql.legacy.exponentLiteralAsDecimal.enabled",true)
      .enableHiveSupport()
      .config("hive.metastore.uris","thrift://****:9083")
      .getOrCreate()

private def chackAndCreateTable(row: Row)(implicit sparkSession: SparkSession): Unit = {
    try{

      var chackTableExist = s"SHOW CREATE TABLE ${row.getAs("table")}_cow"
       sparkSession.sql(chackTableExist)
    }catch{
      case ex:AnalysisException => {
        if(ex.message.contains("Table or permanent view not found")){
          println(s"todo 没有找到${row.getAs("table")}_cow表 自动建 ods表")

          sparkSession.sql(s"""create schema if not exists ${row.getAs("database")} location '/hudi/datalake/${row.getAs("database")}/';""");

          //println(row.schema.toDDL)
          var sqlString = s"""create table ${row.getAs("table")}_cow (${row.schema.toDDL})
            |using hudi tblproperties (
            |type = 'cow',
            |primaryKey = 'pk',
            |preCombineField = 'es',
            |hoodie.datasource.hive_sync.enable = true,
            |hoodie.datasource.hive_sync.database= '${row.getAs("database")}',
            |hoodie.datasource.hive_sync.table = '${row.getAs("table")}_cow',
            |hive_sync.metastore.uris = 'thrift://****:9083',
            |hive_sync.jdbc_url= 'jdbc:hive2://****:10000',
            |hoodie.datasource.hive_sync.mode = 'hms'
            |) location '/hudi/datalake/${row.getAs("database")}/${row.getAs("table")}_cow';
          """.stripMargin.stripMargin('\n')
          val tbDf = sparkSession.sql(sqlString).printSchema()
          println(sqlString)

Expected behavior

create table and sync to hive. that table should be under the schema I set

Environment Description

Hudi version : 0.11

Spark version : 3.2.1

Hadoop version : 3.2.2

Storage (HDFS/S3/GCS..) : HDFS

Running on Docker? (yes/no) : no

Stacktrace

N/A but here is my output from code

22/05/26 09:14:27 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
22/05/26 09:14:27 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
root

create table bko_****_cow (`login_id` ,`es` TIMESTAMP,`database` STRING,`pk` STRING,`table` STRING)
using hudi tblproperties (
type = 'cow',
primaryKey = 'pk',
preCombineField = 'es',
hoodie.datasource.hive_sync.enable = true,
hoodie.datasource.hive_sync.database= 'dev_****',
hoodie.datasource.hive_sync.table = 'bko_****_cow',
hive_sync.metastore.uris = 'thrift://****:9083',
hive_sync.jdbc_url= 'jdbc:hive2://****:10000',
hoodie.datasource.hive_sync.mode = 'hms'
) location '/hudi/datalake/dev_***/bko_***_cow';
xushiyan commented 1 year ago

@dongkelun would you be able to help with this? seems like hive sync config for database was not passed to sql properly. You may try reproduce this with both the latest master version and 0.11.1.

dongkelun commented 1 year ago

@dongkelun would you be able to help with this? seems like hive sync config for database was not passed to sql properly. You may try reproduce this with both the latest master version and 0.11.1.

ok

dongkelun commented 1 year ago

@sunke38 @xushiyan When creating a table with spark sql, if hive is enabled, a table will be created in the corresponding hive database. The default value is default. You can use spark sql ("use databaseName") to change the current database. If the table name has a database name, the database name in the table name will prevail. The database name configured through hoodie.datasource.hive_sync.database will be synchronized to the corresponding database only when writing data. However, your code does not write data, so it will not be synchronized.If it is not configured, its default value is as follows:

hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_DATABASE_NAME, hoodieCatalogTable.table.identifier.database.getOrElse("default"))
nsivabalan commented 1 year ago

@sunke38 : does above response makes sense. Let us know how we can assist here. if you get it resolved, feel free to close out the issue.