apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] ClassCastException in RecordLevelIndexing #11697

Closed bibhu107 closed 1 month ago

bibhu107 commented 1 month ago

I'm encountering a ClassCastException when using Record Level Indexing (RLI) with Hudi 0.14.1 and Spark 3.4. The issue occurs on the second run of my script after restarting the Spark shell.

Steps to reproduce:

  1. Start Spark shell with Hudi 0.14.1 with below command
  2. Run the attached script (performs two upserts)
  3. Close the Spark shell
  4. Restart the Spark shell
  5. Run the same script again (try for next 3-4 times)
export SPARK_VERSION=3.4
$SPARK_HOME/bin/spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar

Simple upsert Script using RecordLevelIndexing

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.config.HoodieCleanConfig.CLEANER_COMMITS_RETAINED
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import java.sql.Timestamp
import java.text.SimpleDateFormat

object HudiRLIMain {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Hudi RLI Test")
      .master("local[*]")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
      .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
      .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
      .getOrCreate()

    val HUDI_DATA_FORMAT = "org.apache.hudi"
    val basePath = "/Users/palbibhu/github/SparkProject/HudiRLITesting/outputpath"
    val outputTableName = "Idempotent"
    val primaryKey = "lookupKey"
    val SIMPLE_INDEX = "SIMPLE"
    val RECORD_LEVEL_INDEX = "RECORD_INDEX"
    val MAX_COMMIT_TO_KEY_VALUE = "3"

    try {
      val hudiOptions = Map[String, String](
        RECORDKEY_FIELD_OPT_KEY -> primaryKey,
        PRECOMBINE_FIELD_OPT_KEY -> "booking",
        PARTITIONPATH_FIELD_OPT_KEY -> "Data",
        HoodieWriteConfig.TABLE_NAME -> outputTableName,
        HoodieIndexConfig.INDEX_TYPE_PROP -> RECORD_LEVEL_INDEX,
        HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key() -> "true",
        CLEANER_COMMITS_RETAINED.key() -> MAX_COMMIT_TO_KEY_VALUE

      )

      def writeToHudi(df: DataFrame): Unit = {
        df.write.format(HUDI_DATA_FORMAT)
          .options(hudiOptions)
          .mode("append")
          .save(basePath)
      }

      def readFromHudi(): DataFrame = {
        spark.read.format(HUDI_DATA_FORMAT)
          .option("hoodie.datasource.query.type", "snapshot")
          .load(s"$basePath/*")
      }

      val df1 = readDummyPaths(spark)
      writeToHudi(df1)
      println("First Time Insertion Done! Now We gonna Read the Table")
      readFromHudi().show(10, false)

      val df2 = readMoreDummyPaths(spark)
      writeToHudi(df2)
      println("Updated Successfully! Now We gonna Read the Table")
      readFromHudi().show(10, false)

    } finally {
      spark.stop()
    }
  }

  private val schema = StructType(Seq(
    StructField("lookupKey", StringType, nullable = false),
    StructField("row_id", StringType, nullable = false),
    StructField("Name", StringType, nullable = true),
    StructField("Salary", LongType, nullable = true),
    StructField("booking", TimestampType, nullable = true)
  ))

  private def createDataFrame(spark: SparkSession, data: Seq[Row]): DataFrame = {
    spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
  }

  def readDummyPaths(spark: SparkSession): DataFrame = {
    val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
    val data = Seq(
      Row("1", "111_nww", "book", 32232L, new Timestamp(dateFormat.parse("2024-04-03T09:04:21.786Z").getTime)),
      Row("2", "222", "table", 23232L, new Timestamp(dateFormat.parse("2024-04-03T09:04:21.786Z").getTime)),
      Row("3", "333", "kite", 232332L, new Timestamp(dateFormat.parse("2024-04-03T09:04:21.786Z").getTime))
    )
    createDataFrame(spark, data)
  }

  def readMoreDummyPaths(spark: SparkSession): DataFrame = {
    val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
    val data = Seq(
      Row("1", "112_v2", "Goldi", 32232L, new Timestamp(dateFormat.parse("2024-04-03T09:04:21.786Z").getTime)),
      Row("2", "223", "Sonu", 23232L, new Timestamp(dateFormat.parse("2024-04-03T09:04:21.786Z").getTime)),
      Row("3", "334", "Mango", 232332L, new Timestamp(dateFormat.parse("2024-04-03T09:04:21.786Z").getTime))
    )
    createDataFrame(spark, data)
  }
}

Issue:

  1. For the first time when Script runs , I am upserting twice and its working correctly
  2. In Second run when I close the shell and re-start and run the same command it fails with below error
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    at org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270)
    at org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296)
    at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
    at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:856)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:856)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file 
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:414)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:220)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.scanByFullKeys(HoodieMergedLogRecordScanner.java:160)
    at org.apache.hudi.metadata.HoodieMetadataLogRecordReader.getRecordsByKeys(HoodieMetadataLogRecordReader.java:108)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.readLogRecords(HoodieBackedTableMetadata.java:327)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:304)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275)
    at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
    ... 38 more
Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class org.apache.hudi.avro.model.HoodieDeleteRecordList (org.apache.avro.generic.GenericData$Record is in unnamed module of loader 'app'; org.apache.hudi.avro.model.HoodieDeleteRecordList is in unnamed module of loader scala.reflect.internal.util.ScalaClassLoader$URLClassLoader @56b751b1)
    at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:161)
    at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:116)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:828)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:403)
    ... 45 more

24/07/28 12:42:54 WARN HoodieSparkSqlWriterInternal: Closing write client
ad1happy2go commented 1 month ago

@bibhu107 This is known issue. Can you look here - https://github.com/apache/hudi/issues/10609#issuecomment-2245686849

We are going to add into FAQ too soon.

bibhu107 commented 1 month ago

Hi @ad1happy2go What is the way forward? Is this bug going to be addressed soon? Our jobs are dependent on RLI feature.

ad1happy2go commented 1 month ago

@bibhu107 You can add the bundle to extra class path and make it work. Is it not working for you?

bibhu107 commented 1 month ago

Okay! I was running it in my local machine and getting the error. I will try checking commands to give bundle in extra class path ! Thanks

ad1happy2go commented 1 month ago

Thanks @bibhu107 Do let us know your findings.

ad1happy2go commented 1 month ago

@bibhu107 Also added troubleshooting guide for this issue - https://github.com/apache/hudi/pull/11716

bibhu107 commented 1 month ago

Thank you @ad1happy2go