apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.47k stars 2.43k forks source link

[SUPPORT] Large keyspace upsert results in OOM #11960

Open meatheadmike opened 2 months ago

meatheadmike commented 2 months ago

Describe the problem you faced

Hi Folks. I'm trying to get some advice here on how to better deal with a large upsert dataset.

The data has a very wide key space and no great/obvious partitionable columns. What I'm aiming for is a record key with roughly 1.5 billion unique values. Any one of those records could be updated at any time. The pipeline so far is a pyspark based streaming application. The eventual source will be a kafka topic, but at the moment it's using a datagenerator set to emit 10000 rows per second (specifically it's the dbldatagen python module). The sink is an S3 bucket with a MOR hudi table. The spark cluster is hosted under kubernetes on AWS. I'm using the latest hudi beta, the latest production spark (3.5.2), hive metastore 3.x and zookeeper for locking (latest production version). The executors are currently set to 10 at 8GB ram and 1 core each. And while I could feasibly increase the ram on the executors, I had previously done this (from 4 to 8) and it didn't buy me much. I seem to be topped out at roughly 500 million records (and 500 million distinct values on the record key).

The issue is that while I can get the pipeline to run, eventually it gets to a point where it simply OOM's on the executors during a union on the upsert. I've tried numerous memory / parallelism tuning options to no avail.

Is there a way to make this work without infinitely scaling memory? Is a wide keyspace like this a dead end? I did try partitioning by applying an md5 to the record key and using the 1st two characters. That gave me 256 unique more-or-less evenly distributed partitions. But that led to drastically increased batch times.

Record level indexing is enabled. That seems to help things on the write path. On the read path I've noticed that when data skipping is enabled queries actually take significantly longer. So perhaps my index has grown unmanageable? Maybe this is partially responsible for the OOM's on the write side.

To Reproduce

Steps to reproduce the behavior:

hudi config:

{
    "hoodie.insert.shuffle.parallelism": "500",
    "hoodie.upsert.shuffle.parallelism": "500",
    "hoodie.delete.shuffle.parallelism": "500",
    "hoodie.bulkinsert.shuffle.parallelism": "500",
    "hoodie.clean.commits.retained": 5,
    "hoodie.clean.policy": "KEEP_LATEST_COMMITS",
    "hoodie.clean.fileversions.retained": "1",
    "hoodie.keep.min.commits": "10",
    "hoodie.datasource.compaction.async.enable": "true",
    "hoodie.logfile.data.block.max.size": 8388608,
    "hoodie.logfile.max.size": 8388608,
    "hoodie.datasource.hive_sync.db": "default",
    "hoodie.datasource.hive_sync.enable": true,
    "hoodie.datasource.hive_sync.ignore_exceptions": true,
    "hoodie.datasource.hive_sync.jdbcurl": "jdbc:hive2://hive-metastore.default:10000",
    "hoodie.datasource.hive_sync.metastore.uris": "thrift://hive-metastore.default:9083",
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor",
    "hoodie.datasource.hive_sync.password": "",
    "hoodie.datasource.hive_sync.recreate_table_on_error": true,
    "hoodie.datasource.hive_sync.skip_ro_suffix": true,
    "hoodie.datasource.hive_sync.table": "hudi_sink",
    "hoodie.datasource.hive_sync.username": "hive",
    "hoodie.archive.async": true,
    "hoodie.clean.async.enabled": true,
    "hoodie.clean.automatic": true,
    "hoodie.cleaner.policy.failed.writes": "LAZY",
    "hoodie.clustering.async.enabled": false,
    "hoodie.clustering.async.max.commits": 0,
    "hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
    "hoodie.clustering.inline": true,
    "hoodie.clustering.inline.max.commits": 0,
    "hoodie.clustering.plan.strategy.class": "org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy",
    "hoodie.clustering.plan.strategy.sort.columns": "name",
    "hoodie.clustering.preserve.commit.metadata": true,
    "hoodie.clustering.rollback.pending.replacecommit.on.conflict": true,
    "hoodie.clustering.updates.strategy": "org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy",
    "hoodie.compact.inline.max.delta.commits": 2,
    "hoodie.datasource.meta.sync.enable": "true",
    "hoodie.datasource.read.incr.fallback.fulltablescan.enable": true,
    "hoodie.datasource.read.use.new.parquet.file.format": true,
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.precombine.field": "timestamp",
    "hoodie.datasource.write.record.merger.impls": "org.apache.hudi.HoodieSparkRecordMerger",
    "hoodie.datasource.write.recordkey.field": "name",
    "hoodie.datasource.write.row.writer.enable": true,
    "hoodie.datasource.write.streaming.checkpoint.identifier": "datagen-writer1",
    "hoodie.datasource.write.table.name": "hudi_sink",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.enable.data.skipping": true,
    "hoodie.index.type": "RECORD_INDEX",
    "hoodie.logfile.data.block.format": "parquet",
    "hoodie.merge.use.record.positions": true,
    "hoodie.metadata.auto.initialize": true,
    "hoodie.metadata.enable": true,
    "hoodie.metadata.clean.async": true,
    "hoodie.metadata.index.async": false,
    "hoodie.metadata.index.column.stats.columns": "name",
    "hoodie.metadata.index.column.stats.column.list": "name",
    "hoodie.metadata.index.column.stats.enable": true,
    "hoodie.metadata.record.index.enable": true,
    "hoodie.parquet.compression.codec": "snappy",
    "hoodie.record.index.use.caching": true,
    "hoodie.table.name": "hudi_sink",
    "hoodie.table.services.enabled": true,
    "hoodie.write.concurrency.early.conflict.detection.enable": true,
    "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
    "hoodie.write.executor.type": "DISRUPTOR",
    "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
    "hoodie.write.lock.zookeeper.base_path": "/hudi/locl/table",
    "hoodie.write.lock.zookeeper.lock_key": "hudi_sink",
    "hoodie.write.lock.zookeeper.port": "2181",
    "hoodie.write.lock.zookeeper.url": "zk-cs.default",
    "checkpointLocation": "s3a://XXXXX/hudi_checkpoints",
    "path": "s3a://XXXXX/hudi_out_data"
}

spark config:

spark.checkpoint.compress true
spark.driver.maxResultSize 2g
spark.dynamicAllocation.shuffleTracking.enabled true
spark.eventLog.dir s3a://XXXXX/spark_logs/
spark.eventLog.enabled true
spark.eventLog.rolling.enabled true
spark.eventLog.rolling.maxFileSize 20m
spark.executor.memoryOverhead 2g
spark.hadoop.fs.s3a.aws.credentials.provider com.amazonaws.auth.WebIdentityTokenCredentialsProvider
spark.hadoop.fs.s3a.fast.upload true
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.hive.metastore.warehouse.dir s3a://XXXXX/warehouse
spark.hadoop.parquet.avro.write-old-list-structure false
spark.history.fs.logDirectory s3a://XXXXX/spark_logs/
spark.hive.metastore.warehouse.dir s3a://XXXXX/warehouse
spark.io.compression.codec snappy
spark.kryo.registrator org.apache.spark.HoodieSparkKryoRegistrar
spark.memory.fraction 0.2
spark.memory.storageFraction 0.2
spark.rdd.compress true
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled false # leave disabled as kubernetes doesn't support exrernal shuffle service
spark.speculation false
spark.streaming.stopGracefullyOnShutdown true
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.catalogImplementation hive
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.sql.hive.convertMetastoreParquet false
spark.sql.hive.metastore.jars path
spark.sql.hive.metastore.jars.path file:///opt/apache-hive-3.1.3-bin/lib/*.jar
spark.sql.hive.metastore.version 3.1.3
spark.sql.parquet.enableVectorizedReader true
spark.sql.shuffle.partitions 500
spark.sql.streaming.stateStore.compression.codec snappy
spark.sql.streaming.stateStore.providerClass org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
spark.sql.warehouse.dir s3a://XXXXX/warehouse
spark.ui.prometheus.enabled true
#
# S3 optimization settings (see https://spark.apache.org/docs/latest/cloud-integration.html):
#
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.parquet.enable.summary-metadata false
spark.sql.hive.metastorePartitionPruning true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.parquet.filterPushdown true
spark.sql.parquet.mergeSchema false
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.streaming.checkpointFileManagerClass org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager

Expected behavior

Pipeline should run without OOM

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

│ 24/09/18 18:10:29 WARN TaskSetManager: Lost task 5.0 in stage 68.0 (TID 3753) (10.239.14.87 executor 8): java.lang.OutOfMemoryError: Java heap space                         │
│     at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)                                                                                                      │
│     at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)                                                                                                           │
│     at org.apache.spark.storage.memory.SerializedValuesHolder.$anonfun$allocator$1(MemoryStore.scala:715)                                                                    │
│     at org.apache.spark.storage.memory.SerializedValuesHolder.$anonfun$allocator$1$adapted(MemoryStore.scala:715)                                                            │
│     at org.apache.spark.storage.memory.SerializedValuesHolder$$Lambda$1216/0x00007995a8a1fa28.apply(Unknown Source)                                                          │
│     at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)                                               │
│     at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)                                                                  │
│     at org.apache.spark.storage.memory.RedirectableOutputStream.write(MemoryStore.scala:809)                                                                                 │
│     at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)                                                                                 │
│     at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)                                                                                             │
│     at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)                                                                                                            │
│     at com.esotericsoftware.kryo.io.Output.require(Output.java:164)                                                                                                          │
│     at com.esotericsoftware.kryo.io.UnsafeOutput.writeLong(UnsafeOutput.java:124)                                                                                            │
│     at com.esotericsoftware.kryo.io.UnsafeOutput.writeLong(UnsafeOutput.java:160)                                                                                            │
│     at com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.write(DefaultSerializers.java:151)                                                            │
│     at com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.write(DefaultSerializers.java:145)                                                            │
│     at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)                                                                                                       │
│     at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)                                                                                          │
│     at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)                                                                                 │
│     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)                                                                                                             │
│     at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)                                                                                          │
│     at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)                                                                                 │
│     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)                                                                                                     │
│     at org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:225)                                                                           │
│     at org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:48)                                                                            │
│     at org.apache.hudi.common.model.HoodieRecord.write(HoodieRecord.java:365)                                                                                                │
│     at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:514)                                                │
│     at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:512)                                                │
│     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)                                                                                                     │
│     at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:278)                                                                             │
│     at org.apache.spark.storage.memory.SerializedValuesHolder.storeValue(MemoryStore.scala:729)                                                                              │
│     at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)      

Any help / tips would be greatly appreciated!

danny0405 commented 2 months ago

I guess the huge resource cost comes from the rewrite of the parquets, because each update touches almost all the file groups, it will trigger a whole table rewrite.

If we have some range partitioning strategy that can limit the scope of updates instead of whole table, would be helpful.

meatheadmike commented 2 months ago

I can certainly attempt partitioning again, but doesn't that just exacerbate the file group problem? My last attempt at partitioning made the batches take waaaaaaaay too long.

If the writing to many files groups truly is the problem, then why does the process go OOM instead of simply taking longer? Is seems that this is something that could be parallelized and sped up. But in my trial increasing cores did not help. Is there a way I can configure this to not use as much ram?

danny0405 commented 2 months ago

Is the OOM happening in Driver or executor? The driver could takes large amout of memory for the fs view file index, the solution to ease it to enable the rockdb-based fs view. The executor could OOM when the parquet and delta log records take memory, that would depend on your parallelism of the executors.

meatheadmike commented 2 months ago

The OOM is happening on the executors. What would you recommend for parallelism? It's already set at 500.

ad1happy2go commented 2 months ago

@meatheadmike Can you please share the spark event logs ?

meatheadmike commented 2 months ago

sparklog.txt

Here you go. I captured the most recent run. I stopped it before the driver completely quit, but you can see where the executors start OOMing.

From the UI perspective it's happening here:

Job Group: SparkUpsertPreppedDeltaCommitActionExecutor
Building workload profile:hudi_sink_metadata
[(kill)](http://localhost:4040/stages/stage/kill/?id=77)[start at NativeMethodAccessorImpl.java:0](http://localhost:4040/stages/stage/?id=77&attempt=1)+details
RDD: [UnionRDD](http://localhost:4040/storage/rdd/?id=162)[UnionRDD](http://localhost:4040/storage/rdd/?id=180)
meatheadmike commented 1 month ago

Update: Bumping driver and executors to 16GB of memory allowed me to get to 2.5 billion rows. So the OOM is possible to overcome with more memory although I'd certainly prefer to run with less memory and more executors.

ad1happy2go commented 1 month ago

@meatheadmike Can you check the size of record index files. How many filegroups you see it in record index.

ad1happy2go commented 1 month ago

@meatheadmike Were you able to check on the same? Do you still face the issue?

meatheadmike commented 1 month ago

I won't be able to check on this for a bit as I'm in the process of rebuilding our lakehouse environment. I'll reply as soon as I get a chance.

ad1happy2go commented 1 month ago

Thanks for the update @meatheadmike .