apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.43k stars 2.42k forks source link

[SUPPORT] Hudi Record Level Index says item not found with Complex keys #12234

Open dataproblems opened 3 days ago

dataproblems commented 3 days ago

Describe the problem you faced

Record lookup in a table with record level index results in None.get exception.

To Reproduce

Steps to reproduce the behavior:

  1. Create a table with two fields in the record key and use Complex Key Generator
  2. Read the table and perform a lookup for a particular key

Expected behavior

I should be able to read the data without any exceptions, like I can for a table generated using simple key generator.

Environment Description

Additional context

Read Options I used:

val ReadOptions: Map[String, String] = Map(
"hoodie.enable.data.skipping" -> "true", 
"hoodie.metadata.enable" -> "true", 
"hoodie.metadata.index.column.stats.enable" -> "true", 
"hoodie.metadata.record.index.enable" -> "true")

Config I used to create the table with complex key

val insertOptions: Map[String, String] = Map(
DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
  DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
  HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
  HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648",
  "hoodie.parquet.small.file.limit" -> "1073741824",
  HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
  HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
  HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
  "hoodie.metadata.record.index.enable" -> "true",
  "hoodie.metadata.enable" -> "true",
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.datasource.write.partitionpath.field" -> "partition",
  "hoodie.datasource.write.recordkey.field" -> "id,partition",
  "hoodie.datasource.write.precombine.field" -> "ts",
  "hoodie.table.name" -> tableName,
  DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[ComplexKeyGenerator].getName,
  "hoodie.write.markers.type" -> "DIRECT",
  "hoodie.embed.timeline.server" -> "true",
  "hoodie.metadata.record.index.min.filegroup.count" -> "100",
 )

Config I used to create the table with simple key:


val insertOptions: Map[String, String] = Map(
DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
  DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
  HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
  HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648",
  "hoodie.parquet.small.file.limit" -> "1073741824",
  HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
  HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
  HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
  "hoodie.metadata.record.index.enable" -> "true",
  "hoodie.metadata.enable" -> "true",
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.datasource.write.partitionpath.field" -> "partition",
  "hoodie.datasource.write.recordkey.field" -> "id",
  "hoodie.datasource.write.precombine.field" -> "ts",
  "hoodie.table.name" -> tableName,
  DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[SimpleKeyGenerator].getName,
  "hoodie.write.markers.type" -> "DIRECT",
  "hoodie.embed.timeline.server" -> "true",
  "hoodie.metadata.record.index.min.filegroup.count" -> "1000",
 )

Code I used to generate the data:

import java.util.UUID
import scala.util.Random
case class RandomData(id: Long, uuid: String, ts: Long = 28800000L, partition: String)

val partitions = List("One", "Two", "Three", "Four")

val randomData = spark.range(1, 10 * 10000000L).map(f => RandomData(id = f, uuid = UUID.randomUUID.toString, partition = Random.shuffle(partitions).head))

Stacktrace

java.util.NoSuchElementException: None.get
        at scala.None$.get(Option.scala:529) ~[scala-library-2.12.15.jar:?]
        at scala.None$.get(Option.scala:527) ~[scala-library-2.12.15.jar:?]
        at org.apache.hudi.RecordLevelIndexSupport.attributeMatchesRecordKey(RecordLevelIndexSupport.scala:89) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.RecordLevelIndexSupport.filterQueryWithRecordKey(RecordLevelIndexSupport.scala:155) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.RecordLevelIndexSupport.$anonfun$filterQueriesWithRecordKey$1(RecordLevelIndexSupport.scala:133) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.RecordLevelIndexSupport.$anonfun$filterQueriesWithRecordKey$1$adapted(RecordLevelIndexSupport.scala:132) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at scala.collection.immutable.List.foreach(List.scala:431) ~[scala-library-2.12.15.jar:?]
        at org.apache.hudi.RecordLevelIndexSupport.filterQueriesWithRecordKey(RecordLevelIndexSupport.scala:132) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.HoodieFileIndex.recordKeys$lzycompute$1(HoodieFileIndex.scala:334) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.HoodieFileIndex.recordKeys$1(HoodieFileIndex.scala:334) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.HoodieFileIndex.$anonfun$lookupCandidateFilesInMetadataTable$1(HoodieFileIndex.scala:338) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at scala.util.Try$.apply(Try.scala:213) ~[scala-library-2.12.15.jar:?]
        at org.apache.hudi.HoodieFileIndex.lookupCandidateFilesInMetadataTable(HoodieFileIndex.scala:321) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.HoodieFileIndex.filterFileSlices(HoodieFileIndex.scala:222) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.HoodieFileIndex.listFiles(HoodieFileIndex.scala:149) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions(DataSourceScanExec.scala:274) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions$(DataSourceScanExec.scala:265) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:543) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:543) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions(DataSourceScanExec.scala:312) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions$(DataSourceScanExec.scala:285) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions$lzycompute(DataSourceScanExec.scala:543) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions(DataSourceScanExec.scala:543) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.isDataPrefetchSupportedForAllFiles(DataSourceScanExec.scala:697) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.shouldPrefetchData$lzycompute(DataSourceScanExec.scala:599) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.shouldPrefetchData(DataSourceScanExec.scala:595) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:628) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:603) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.doExecuteColumnar(DataSourceScanExec.scala:753) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:241) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:265) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:262) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:237) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.InputAdapter.doExecuteColumnar(WholeStageCodegenExec.scala:678) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:241) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:265) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:262) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:237) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.ColumnarToRowExec.inputRDDs(Columnar.scala:399) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:304) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:53) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:950) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:214) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:265) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
danny0405 commented 3 days ago

Related fix: https://github.com/apache/hudi/pull/12160

dataproblems commented 2 days ago

Thank you @danny0405 - do you have an estimate for the versions that will get this fix? or will it only be for the latest one?

ad1happy2go commented 17 hours ago

@dataproblems Can you also please test this PR on your dataset?

dataproblems commented 15 hours ago

@ad1happy2go - are you suggesting that I build the hudi jar and apply these changes and use that jar instead of the packages available through maven central? If I have some extra bandwidth, I will give that a try but you can also reproduce the error using the random dataset for which I provided the data generation code.

ad1happy2go commented 11 minutes ago

@dataproblems Correct thats what i meant. But i will give it a try if you dont have bandwidth. Thanks.