apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.42k forks source link

[SUPPORT] java.lang.ClassCastException with incremental query #9172

Open bkosuru opened 1 year ago

bkosuru commented 1 year ago

Hudi version: hudi-spark3.3-bundle_2.12-0.13.1 Table type: COW Env: GCP Dataproc batches V 1.1

I am getting the following exception with incremental query:

23/07/11 22:27:50 ERROR TaskSetManager: Task 552 in stage 1.0 failed 4 times; aborting job Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 552 in stage 1.0 failed 4 times, most recent failure: Lost task 552.3 in stage 1.0 (TID 771) (10.12.0.121 executor 34): java.lang.ClassCastException: class org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to class org.apache.spark.sql.catalyst.InternalRow (org.apache.spark.sql.vectorized.ColumnarBatch and org.apache.spark.sql.catalyst.InternalRow are in unnamed module of loader 'app') at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:98) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1$adapted(HashAggregateExec.scala:95) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2716) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2652) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2651) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2651) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1189) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1189) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1189) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2904) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2846) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2835) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) Caused by: java.lang.ClassCastException: class org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to class org.apache.spark.sql.catalyst.InternalRow (org.apache.spark.sql.vectorized.ColumnarBatch and org.apache.spark.sql.catalyst.InternalRow are in unnamed module of loader 'app') at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:98) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1$adapted(HashAggregateExec.scala:95) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

soumilshah1995 commented 1 year ago

can you share some code or other details ?

danny0405 commented 1 year ago

It seems the vectorization execution is enabled while Hudi can not handle it, it is kind of related with the Spark version, can you use spark 3.2.x and try again ~, in 0.14.0, it would be fixed.

bkosuru commented 1 year ago

Hi Danny, We are running in Spark 3.3.2. That is the Spark version supported in GCP Dataproc batches V 1.1. Any other suggestions?

danny0405 commented 1 year ago

cc @yihua , what is the suggested spark version for GCP now ?

ad1happy2go commented 1 year ago

@bkosuru Can you share us some reproducible steps or code. I am able to work with spark 3.3 on dataproc and use incremental query.

I did tried to upsert and deletes also multiple times. I used Quickstart dataset to do so.

bkosuru commented 1 year ago

Hi @ad1happy2go, Did you use dataproc serverless(batches)? It failed with incremental query(set the following option on the reader) reader.option(BEGIN_INSTANTTIME_OPT_KEY, "20220123")

Also you need a big dataset. You cannot reproduce with few MB data. It worked fine for me on a dataset size 420MB

My dataset size is 66GB. The data was loaded into Hudi on 2022-09-06. So loading the entire data using incremental query The dataset has 4 columns(s,p,o,g). The data is all Strings. 2 columns are used as partitions. There is only one partition for one column and 100 partitions for the second column.

Here is the hudi options used: val DELETED_COL = "isDeleted"

private val AVG_RECORD_SIZE = 256 // approx bytes of our average record, contra Hudi default assumption of 1024 private val ONE_GIGABYTE = 1024 1024 1024 // used for Parquet file size & block size private val BLOOM_MAX_ENTRIES = ONE_GIGABYTE / (2 * AVG_RECORD_SIZE)

def save( df: DataFrame, operation: Operation, output: String, tableName: String, parallelism: Int, saveMode: SaveMode ): Unit = { df.write .format(HUDI_FORMAT)

  // DataSourceWriteOptions
  .option(operation.parallelismOption, parallelism)
  .options(
    if (operation == InsertDedup)
      Map(INSERT_DROP_DUPS_OPT_KEY -> true.toString)
    else Map[String, String]()
  )
  .option(HIVE_STYLE_PARTITIONING_OPT_KEY, true)
  .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SpoKeyGenerator].getName)
  .option(
    OPERATION_OPT_KEY,
    operation.hudiOp
  ) // insert, bulk_insert, upsert, or delete
  .option(PARTITIONPATH_FIELD_OPT_KEY, "g, p")
  .option(PRECOMBINE_FIELD_OPT_KEY, DELETED_COL)
  .option(RECORDKEY_FIELD_OPT_KEY, "s, o")
  .option(URL_ENCODE_PARTITIONING_OPT_KEY, true)

  // HoodieIndexConfig
  .option(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_MAX_ENTRIES)
  .option(BLOOM_INDEX_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name)

  // HoodieCompactionConfig
  // For first commit to a hudi table, to determine how many records can fit into a data file
  // Useful for hudi copy; can be tweaked if filecount differs from the source; default 1024
  .option(COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 64)
  // Commit history; MIN should be less than MAX; CLEANER should be less than MIN
  .option(CLEANER_COMMITS_RETAINED_PROP, Integer.MAX_VALUE - 2)
  .option(MIN_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE - 1)
  .option(MAX_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE)

  // HoodieStorageConfig
  .option(LOGFILE_SIZE_MAX_BYTES, ONE_GIGABYTE / 0.35)
  .option(PARQUET_BLOCK_SIZE_BYTES, ONE_GIGABYTE)
  .option(
    PARQUET_FILE_MAX_BYTES,
    ONE_GIGABYTE
  ) // Current hadoop cfg uses 256MB block size.

  // HoodieWriteConfig
  .option(EMBEDDED_TIMELINE_SERVER_ENABLED, false)
  .option(HoodieWriteConfig.TABLE_NAME, tableName)

  .option("hoodie.metadata.enable", false)
  .option("hoodie.index.type", "BLOOM")
  .mode(saveMode)
  .save(path)

}

class SpoKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) {

def hash128(s: String): String = { val h: Array[Long] = MurmurHash3.hash128(s.getBytes) h(0).toString + h(1).toString }

override def getRecordKey(record: GenericRecord): String = { val s = HoodieAvroUtils.getNestedFieldValAsString(record, "s", false, false) val o = HoodieAvroUtils.getNestedFieldValAsString(record, "o", false, false) genKey(s, o) }

private def genKey(s: String, o: String): String = hash128(s + o)

override def getRecordKey(row: Row): String = { val s = row.getAs(0).toString val o = row.getAs(1).toString genKey(s, o) }

}

bkosuru commented 1 year ago

@danny0405 Any idea when 0.14.0 will be out?

danny0405 commented 1 year ago

Should come out in 2 weeks.

bkosuru commented 1 year ago

Still waiting for this fix. @danny0405 Any idea when 0.14.0 will be out?

ad1happy2go commented 1 year ago

@bkosuru RC2 is already out. It will be released soon officially.

Can you try out with RC2 and see if you still face the issue. Sorry I couldn't got time to work on this. Will take up if you still face issue with RC2.

michael1991 commented 1 year ago

Facing same issue with Dataproc image version from 2.1.23-debian11 (Hudi 0.12.3, Spark 3.3.2 inside), so I'm still using Dataproc image version 2.1.22-debian11(Hudi 0.12.0, Spark 3.3.0 inside). Any more ideas on it?

arw357 commented 1 year ago

Bump - facing same issue with 0.12.0 on our Spark 3.3.2 .

arw357 commented 1 year ago

I went around the issue setting : spark.sql.parquet.enableVectorizedReader=false but probably at a performance hit.

bkosuru commented 1 year ago

Same exception with Hudi 0.14.0 and Spark 3.3.2. (GCP serverless 1.1) @danny0405 you said the issue should be resolved with Hudi 0.14.0. Do you know why it is still broken?

Works fine with Hudi 0.14.0 and Spark 3.4.0 though. We have to upgrade to GCP serverless 2.1

danny0405 commented 1 year ago

cc @CTTY from the AWS team, do you have any thought that can help here?

danny0405 commented 1 year ago

Here is the fix I found: https://github.com/apache/hudi/pull/8082

bkosuru commented 1 year ago

@danny0405 The fix in PR https://github.com/apache/hudi/pull/8082

 if (!HoodieSparkUtils.gteqSpark3_3_2) {
      sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true")
    }

does not work because the default value for spark.sql.parquet.enableVectorizedReader is true

Works fine with Hudi 0.14.0 and Spark 3.4.0 in a local setting but we cannot upgrade to GCP serverless 2.1 because it requires scala 2.13 and hudi isn’t available for Scala 2.13

Two options: 1)Provide correct fix for Spark 3.3.2 2)Make Hudi available for Scala 2.13

danny0405 commented 1 year ago

Hi, @rahil-c , do you have any chance to look into this?

yihua commented 1 year ago

For existing Hudi releases on Spark 3.3.2, the mitigation is to set spark.sql.parquet.enableVectorizedReader=false. @rahil-c @CTTY I remember the vectorized reader is turned off in Hudi Spark integration for Spark 3.3.2, is it?

ankur-a commented 2 months ago

Any permanent fix for this, we are using Hudi 0.15.0 and spark 3.3.2 and facing similar issue, was resolved by setting spark.sql.parquet.enableVectorizedReader=false but this has lead to huge performance degradation for our jobs.