apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.37k stars 2.42k forks source link

[SUPPORT] Hudi could override users' configurations #11188

Open boneanxs opened 5 months ago

boneanxs commented 5 months ago

We recently also met the issue https://github.com/apache/hudi/issues/9305, but with the different cause(we still use hudi 0.12).

The user set the configure spark.sql.parquet.enableVectorizedReader to false manually, and read a hive table and cache it. Given spark will analyze the plan firstly if it needs to be cached, so currently spark won't add C2R to that cached plan since vectorized reader is false. At currently, spark won't execute that plan since there's no action operator.

Then user tries to read a MOR read_optimized table and join that cached plan and get the result, as mor table will automatically update the enableVectorizedReader to true, actually that hive table is read as column batch, but the plan doesn't contain C2R to convert the batch to row, whereas the error occurs:

Screenshot 2024-05-10 at 18 32 22

ava.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1481)
    at 
  override def imbueConfigs(sqlContext: SQLContext): Unit = {
    super.imbueConfigs(sqlContext)
    sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true")
  }

I see there's some modification in the master code, but I suspect this issue could still happen since we'd also modify it in HoodieFileGroupReaderBasedParquetFileFormat:

spark.conf.set("spark.sql.parquet.enableVectorizedReader", supportBatchResult)

Besides this issue, Is it suitable to set spark configures globally? No matter users set it or not, I actually see hudi could set many spark relate configures in SparkConf, most of them are related to parquet reader/writer. This could confuse users and make it hard for devs to find the cause.

danny0405 commented 5 months ago

I actually see hudi could set many spark relate configures in SparkConf, most of them are related to parquet reader/writer.

Are these options configurable?

boneanxs commented 5 months ago

I actually see hudi could set many spark relate configures in SparkConf, most of them are related to parquet reader/writer.

Are these options configurable?

Yes, these configures could be set by users