apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.43k forks source link

INSERT_DROP_DUPS fails with HoodieUpsertException #9859

Open bkosuru opened 1 year ago

bkosuru commented 1 year ago

UseCase: We use Hudi COW tables. Data is stored in GCS (Google cloud). Our datasets have 4 columns with few billions of rows. We ingest data on a weekly basis. Weekly data can be few GB to few TB. Most of the time it is few GB. We want to avoid inserting duplicates using INSERT_DROP_DUPS option. There are no issues if the input data is less than 100GB. But when the data size is more than 100GB we get HoodieUpsertException.

Steps to reproduce the behavior:

Here is the hudi options used to write data-

val DELETED_COL = "isDeleted"

private val AVG_RECORD_SIZE = 256 // approx bytes of our average record, contra Hudi default assumption of 1024 private val ONE_GIGABYTE = 1024 1024 1024 // used for Parquet file size & block size private val BLOOM_MAX_ENTRIES = ONE_GIGABYTE / (2 * AVG_RECORD_SIZE)

def save( df: DataFrame, operation: Operation, output: String, tableName: String, parallelism: Int, saveMode: SaveMode ): Unit = { df.write .format(HUDI_FORMAT)

// DataSourceWriteOptions .option(operation.parallelismOption, parallelism) .options( if (operation == InsertDedup) Map(INSERT_DROP_DUPS_OPT_KEY -> true.toString) else Map[String, String]() ) .option(HIVE_STYLE_PARTITIONING_OPT_KEY, true) .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SpoKeyGenerator].getName) .option( OPERATION_OPT_KEY, operation.hudiOp ) // insert, bulk_insert, upsert, or delete .option(PARTITIONPATH_FIELD_OPT_KEY, "g, p") .option(PRECOMBINE_FIELD_OPT_KEY, DELETED_COL) .option(RECORDKEY_FIELD_OPT_KEY, "s, o") .option(URL_ENCODE_PARTITIONING_OPT_KEY, true)

// HoodieIndexConfig .option(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_MAX_ENTRIES) .option(BLOOM_INDEX_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name)

// HoodieCompactionConfig // For first commit to a hudi table, to determine how many records can fit into a data file // Useful for hudi copy; can be tweaked if filecount differs from the source; default 1024 .option(COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 64) // Commit history; MIN should be less than MAX; CLEANER should be less than MIN .option(CLEANER_COMMITS_RETAINED_PROP, Integer.MAX_VALUE - 2) .option(MIN_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE - 1) .option(MAX_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE)

// HoodieStorageConfig .option(LOGFILE_SIZE_MAX_BYTES, ONE_GIGABYTE / 0.35) .option(PARQUET_BLOCK_SIZE_BYTES, ONE_GIGABYTE) .option( PARQUET_FILE_MAX_BYTES, ONE_GIGABYTE ) // Current hadoop cfg uses 256MB block size.

// HoodieWriteConfig .option(EMBEDDED_TIMELINE_SERVER_ENABLED, false) .option(HoodieWriteConfig.TABLE_NAME, tableName)

.option("hoodie.metadata.enable", false) .option("hoodie.index.type", "BLOOM") .mode(saveMode) .save(path) }

class SpoKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) {

def hash128(s: String): String = { val h: Array[Long] = MurmurHash3.hash128(s.getBytes) h(0).toString + h(1).toString }

override def getRecordKey(record: GenericRecord): String = { val s = HoodieAvroUtils.getNestedFieldValAsString(record, "s", false, false) val o = HoodieAvroUtils.getNestedFieldValAsString(record, "o", false, false) genKey(s, o) }

private def genKey(s: String, o: String): String = hash128(s + o)

override def getRecordKey(row: Row): String = { val s = row.getAs(0).toString val o = row.getAs(1).toString genKey(s, o) }

}

Environment Description

Stacktrace

FetchFailed(BlockManagerId(27, 10.12.0.218, 42247, None), shuffleId=14, mapIndex=405, mapId=199610, reduceId=569, message= org.apache.spark.shuffle.FetchFailedException at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1330) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1034) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:87) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205) at org.apache.spark.util.collection.ExternalSorter.insertAllAndUpdateMetrics(ExternalSorter.scala:681) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:154) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrR insert_dedup eadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.spark.ExecutorDeadException: Executor 27 is dead. at org.apache.spark.network.netty.NettyBlockTransferService.org$apache$spark$network$netty$NettyBlockTransferService$$throwExecutorDeadException(NettyBlockTransferService.scala:262) at org.apache.spark.network.netty.NettyBlockTransferService.ensureExecutorAlive(NettyBlockTransferService.scala:239) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:125) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:475) at org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1300) at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1292) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1144) ... 38 more

Spark UI screenshots attached.

bkosuru commented 1 year ago

Spark Config .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.ui.showConsoleProgress", "true") .set("spark.ui.enabled", "false") .set("spark.sql.parquet.mergeSchema", "false") .set("spark.sql.files.ignoreCorruptFiles", "true") .set("spark.sql.hive.convertMetastoreParquet", "false") .set("spark.task.maxFailures", "4") .set("spark.rdd.compress", "true") .set("spark.task.cpus", "1") .set("spark.kryoserializer.buffer.max", "512")

Resources: spark.driver.cores=8,\ spark.driver.memory=48g,\ spark.driver.maxResultSize=20g,\ spark.executor.cores=8,\ spark.executor.instances=200,\ spark.executor.memory=48g,\ spark.dynamicAllocation.enabled=false,\ spark.sql.shuffle.partitions=3000

ad1happy2go commented 1 year ago

@bkosuru You said your existing dataset is few billion of rows, May I know how much TB is your existing data? Is your updates are spread across most of the partitions?

bkosuru commented 1 year ago

Hi @ad1happy2go We have several tables with sizes in the range ~1TB - ~3TB The largest data set is 3.3TB with 47 partitions. Size of largest partition is 470G

ad1happy2go commented 1 year ago

@bkosuru I see you have good amount of executor-memory for this size of data. Do you mind if we connect on slack to analyse the DAG more. You can reach out to me on Hudi community slack - Aditya Goenka

bkosuru commented 1 year ago

@ad1happy2go, There is not much information in the DAG other than what I shared. I tried different options mentioned in your tuning guide https://hudi.apache.org/docs/tuning-guide/ with no success Decided to go with insert without DROP DUPS for now

ad1happy2go commented 1 year ago

@bkosuru Are you facing slow performance with upsert operation type also? Dups should anyway be dropped with upsert.

bkosuru commented 1 year ago

Did not try. Upsert had poor performance couple of releases ago.

ad1happy2go commented 1 year ago

@bkosuru As your use case is not purely immutable, can you try with upsert operation type if you get better performance?

ad1happy2go commented 1 year ago

Looks like with INSERT_DROP_DUPS , it is resulting lot of shuffle data which is causing it to fail.

bkosuru commented 1 year ago

I will give it a try