apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] Querying Hudi Timeline causes Hive sync failure with AWS Glue Catalog #5694

Closed rafcis02 closed 2 years ago

rafcis02 commented 2 years ago

Describe the problem you faced

I've been struggling with the failing synchronization with Glue Catalog. I have the process(AWS Glue Job) which reads from Hudi table and then writes to the Huid table as well. Data are being properly saved into S3 bucket but it fails on the Hive table synchronization - it is trying to create database and table that already exist in Glue Catalog. That started happening when we added a code which query Hudi table timeline - without that peace of code it works fine.

To Reproduce You need to have already existing Hudi table on S3 bucket with the table in AWS Glue Catalog Steps to reproduce the behavior:

  1. Query the Hudi table timeline e.g. to get inflight instants
  2. Read the data from Hudi table
  3. Upsert data to Hudi table

Here you have the Glue Job script which can use to reproduce it - you need to provide S3 bucket and Glue database name(it cannot be default - you have to create separate database for that). Script contains the part to initialize the table on S3 with sample data. It has a 3 scenarios:

Each step run as a separate Glue Job Run(including initializing the data sample)

import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.hudi.HoodieOptionConfig.{SQL_KEY_PRECOMBINE_FIELD, SQL_KEY_TABLE_PRIMARY_KEY, SQL_KEY_TABLE_TYPE}
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSparkSessionExtension}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import java.sql.Timestamp
import java.time.Instant
import scala.collection.JavaConverters.{asScalaIteratorConverter, mapAsJavaMapConverter}

object TestJob {

  val s3BucketName = "bucketName"
  val glueDatabaseName = "databaseName" // cannot be default as for default it does not fails
  val tableName = "test"
  val tablePath = s"s3://$s3BucketName/$tableName"

  val spark: SparkSession = SparkSession.builder()
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .withExtensions(new HoodieSparkSessionExtension())
    .enableHiveSupport()
    .getOrCreate()

  def main(args: Array[String]): Unit = {
    import spark.implicits._
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

//     init the table on S3 with sample data
    writeDF(List(
      (1, "first", 2020, Timestamp.from(Instant.now())),
      (2, "second", 2020, Timestamp.from(Instant.now())),
      (3, "third", 2020, Timestamp.from(Instant.now())),
      (4, "fourth", 2020, Timestamp.from(Instant.now())),
      (5, "fifth", 2020, Timestamp.from(Instant.now()))
    ).toDF("id", "description", "year", "mod_date"))

//    scenario1WithoutHoodieTableMetaClient()
//    scenario2WithHoodieTableMetaClient()
//    scenario3WithHoodieTableMetaClientAndHiveCloseCurrent()
  }

  def writeDF(df: DataFrame): Unit = {
    new HudiUpsertTarget(
      tableName = tableName,
      databaseName = glueDatabaseName,
      s3BucketName = s3BucketName,
      partitionColumns = List("year"),
      recordKeyColumns = List("id"),
      preCombineKeyColumn = "mod_date"
    ).writeEntity(df)
  }

  def readTable(): DataFrame = {
    spark.read
      .format("org.apache.hudi")
      .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
      .load(tablePath)
  }

  def queryHudiTimeline(): Unit = {
    val fs = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration)
    val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
    val ongoingCommits = metaClient.getActiveTimeline.getCommitsTimeline
      .filterInflightsAndRequested()
      .getInstants.iterator().asScala.toList
    println(ongoingCommits)
  }

  def scenario1WithoutHoodieTableMetaClient(): Unit = {
    val sourceDF = readTable()
    writeDF(sourceDF)
  }

  def scenario2WithHoodieTableMetaClient(): Unit = {
    queryHudiTimeline()
    val sourceDF = readTable()
    writeDF(sourceDF)
  }

  def scenario3WithHoodieTableMetaClientAndHiveCloseCurrent(): Unit = {
    queryHudiTimeline()
    val sourceDF = readTable()
    Hive.closeCurrent()
    writeDF(sourceDF)
  }
}

trait HudiWriterOptions {

  def tableName: String

  def databaseName: String

  def s3BucketName: String

  def fullTableName: String = s"$databaseName.$tableName"

  def s3LocationPath: String = s"s3://$s3BucketName/$tableName/"

  def partitionColumns: List[String]

  def recordKeyColumns: List[String]

  def preCombineKeyColumn: String

  def hudiTableOptions = Map(
    TABLE_TYPE.key() -> COW_TABLE_TYPE_OPT_VAL,
    OPERATION.key() -> UPSERT_OPERATION_OPT_VAL,
    TBL_NAME.key() -> tableName,
    RECORDKEY_FIELD.key() -> recordKeyColumns.mkString(","),
    PARTITIONPATH_FIELD.key() -> partitionColumns.mkString(","),
    KEYGENERATOR_CLASS_NAME.key() -> classOf[ComplexKeyGenerator].getName,
    PRECOMBINE_FIELD.key() -> preCombineKeyColumn,
    URL_ENCODE_PARTITIONING.key() -> "false"
  )

  def sqlTableOptions = Map(
    SQL_KEY_TABLE_TYPE.sqlKeyName -> HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW,
    SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName -> hudiTableOptions(RECORDKEY_FIELD.key()),
    SQL_KEY_PRECOMBINE_FIELD.sqlKeyName -> hudiTableOptions(PRECOMBINE_FIELD.key())
  )

  def hiveTableOptions = Map(
    HIVE_SYNC_MODE.key() -> HiveSyncMode.HMS.name(),
    HIVE_SYNC_ENABLED.key() -> "true",
    HIVE_DATABASE.key() -> databaseName,
    HIVE_TABLE.key() -> hudiTableOptions(TBL_NAME.key()),
    HIVE_PARTITION_FIELDS.key() -> hudiTableOptions(PARTITIONPATH_FIELD.key()),
    HIVE_PARTITION_EXTRACTOR_CLASS.key() -> classOf[MultiPartKeysValueExtractor].getName,
    HIVE_STYLE_PARTITIONING.key() -> "true",
    HIVE_SUPPORT_TIMESTAMP_TYPE.key() -> "true",
    HIVE_TABLE_SERDE_PROPERTIES.key() -> ConfigUtils.configToString(sqlTableOptions.asJava)
  )

  def writerOptions: Map[String, String] = hudiTableOptions ++ hiveTableOptions
}

class HudiUpsertTarget(override val tableName: String,
                       override val databaseName: String,
                       override val s3BucketName: String,
                       override val partitionColumns: List[String],
                       override val recordKeyColumns: List[String],
                       override val preCombineKeyColumn: String
                      ) extends HudiWriterOptions {

  lazy val spark: SparkSession = SparkSession.active

  def writeEntity(dataFrame: DataFrame): Unit = {
    if (!spark.catalog.tableExists(databaseName, tableName)) {
      dataFrame.write
        .format("org.apache.hudi")
        .options(writerOptions)
        .mode(SaveMode.Append)
        .save(s3LocationPath)
    } else {
      upsertData(dataFrame)
    }
  }

  private def upsertData(dataFrame: DataFrame) = {
    val mergeOnStatement = recordKeyColumns
      .map(k => s"hudi_target.$k = hudi_input.$k")
      .mkString(" AND ")

    val mergeIntoStatement =
      s"""MERGE INTO $databaseName.$tableName AS hudi_target
         | USING (SELECT * FROM source_data_set) hudi_input
         | ON $mergeOnStatement
         | WHEN MATCHED THEN UPDATE SET *
         | WHEN NOT MATCHED THEN INSERT *
         |""".stripMargin

    dataFrame.createTempView("source_data_set")
    spark.sql(mergeIntoStatement)
    spark.catalog.dropTempView("source_data_set")
  }
}

Expected behavior

Hive sync should be successful and job should complete without failure - it should detect that the database and table already exists in Glue catalog and there is no any schema change.

Environment Description

Additional context

Comparing Scenario 1 and 3 to Scenario 3 I noticed that in Scenario 3 it does not create the AWS Glue Client to sync the table metada. Here is the log that I can find for scenario 1 and 3 but not for 2:

 metastore.AWSGlueClientFactory (AWSGlueClientFactory.java:newClient(55)): Setting glue service endpoint to https://glue.eu-west-1.amazonaws.com

Stacktrace

2022-05-26 11:23:08,800 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(91)): Exception in User Class
org.apache.hudi.exception.HoodieException: Got runtime exception when hive syncing test
    at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:118)
    at org.apache.hudi.HoodieSparkSqlWriter$.org$apache$hudi$HoodieSparkSqlWriter$$syncHive(HoodieSparkSqlWriter.scala:539)
    at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:595)
    at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:591)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
    at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:591)
    at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:664)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:284)
    at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:285)
    at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:155)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
    at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
    at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at HudiUpsertTarget.upsertData(test.scala:185)
    at HudiUpsertTarget.writeEntity(test.scala:167)
    at TestJob$.writeDF(test.scala:60)
    at TestJob$.scenario2WithHoodieTableMetaClient(test.scala:87)
    at TestJob$.main(test.scala:48)
    at TestJob.main(test.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.amazonaws.services.glue.SparkProcessLauncherPlugin$class.invoke(ProcessLauncher.scala:48)
    at com.amazonaws.services.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:78)
    at com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:142)
    at com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:30)
    at com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)
Caused by: org.apache.hudi.hive.HoodieHiveSyncException: failed to create table test
    at org.apache.hudi.hive.ddl.HMSDDLExecutor.createTable(HMSDDLExecutor.java:129)
    at org.apache.hudi.hive.HoodieHiveClient.createTable(HoodieHiveClient.java:213)
    at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:243)
    at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:184)
    at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:129)
    at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:115)
    ... 37 more
Caused by: InvalidObjectException(message:databaseName)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1453)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
    at com.sun.proxy.$Proxy63.create_table_with_environment_context(Unknown Source)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.create_table_with_environment_context(HiveMetaStoreClient.java:2050)
    at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.create_table_with_environment_context(SessionHiveMetaStoreClient.java:97)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:669)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:657)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:152)
    at com.sun.proxy.$Proxy64.createTable(Unknown Source)
    at org.apache.hudi.hive.ddl.HMSDDLExecutor.createTable(HMSDDLExecutor.java:126)
    ... 42 more
nsivabalan commented 2 years ago

@xushiyan : recently we had a fix around aws glue client factory right. is that related to this issue?

xushiyan commented 2 years ago

@xushiyan : recently we had a fix around aws glue client factory right. is that related to this issue?

no recent fix which resolved the Glue catalog connection issue is not relevant to this.

alexeykudinkin commented 2 years ago

@rafcis02 can also specify your Glue/Hive configs you're using?

Did you try to run it w/ 0.11.1? I was not able to reproduce your issue with the code-snippet you've pasted.

rafcis02 commented 2 years ago

@alexeykudinkin Environment Description

The same issue occur using Hudi 0.11.0 but it works fine for 0.11.1. So it seems it has been somehow fixed with the latest release

alexeykudinkin commented 2 years ago

@xushiyan can you please help close this one?

ricardosilva-ei commented 4 months ago

I got the exact same error and in the end the problem was I missed creating the hive metastore bucket.