apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
823 stars 163 forks source link

Excessive memory usage when running TPC-H Query 21 on a large cluster #927

Closed Kontinuation closed 2 months ago

Kontinuation commented 2 months ago

Describe the bug

We observed excessive memory usage when running TPC-H Query 21 with scale factor = 1000 on a large cluster. The cluster is composed of 48 worker nodes, each worker node has 6 vCPUs and 48 GB memory. We only use 3 cores for each worker node (spark.executor.cores = 3). Please refer to the Additional context section for detailed environment and spark configurations. Running TPC-H Q21 repeatedly yields the following memory usage plot on one of the executors.

image

All other executors have similar memory usage patterns, the RSS gradually climbs to over 30 GB and get stabilized. 8 GB of them is taken by JVM, and the rest 22 GB are native memory allocated by datafusion-comet. This does not seem to be a memory leak, but a memory bloat problem. This amount of memory used by comet makes it impossible to use all 6 cores of the executor pod on EC2 instances with CPU:MEM = 1:8, and infeasible to run on EC2 instances with CPU:MEM = 1:4 (general purpose instances).

Steps to reproduce

Running TPC-H query 21 repeatedly on a Spark cluster. The detailed environment and spark configurations are listed in Additional context.

This problem cannot be reproduced locally using spark.master = local[*].

Expected behavior

Comet uses less memory to make it feasible to work on commodity EC2 instances where CPU:MEM = 1:4.

Additional context

The problem was produced on a self-deployed K8S Spark cluster on AWS.

Here are relevant spark configurations:

spark.executor.cores 3
spark.executor.memory 8192m
# Reserve most of the memory (40 GB) for comet
spark.executor.memoryOverheadFactor 4.99
spark.comet.memory.overhead.factor 0.12

spark.sql.extensions org.apache.comet.CometSparkSessionExtensions
spark.comet.enabled true
spark.comet.exec.enabled true
spark.comet.exec.all.enabled true
spark.comet.exec.shuffle.enabled true
spark.comet.exec.shuffle.mode auto
spark.shuffle.manager org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
Kontinuation commented 2 months ago

We enabled JVM Native Memory Tracking and found that the native memory was not allocated from JVM, so it must be allocated in JNI modules such as comet.

We used the profiling capability of jemalloc to investigate the problem. After running the same workload with the following additional configuration:

spark.executorEnv.LD_PRELOAD = /usr/lib/x86_64-linux-gnu/libjemalloc.so
spark.executorEnv.MALLOC_CONF = prof:true,lg_prof_interval:30,prof_prefix:/path/to/jeprof.out

we obtained very interesting profiling results. Here is the flamegraph of the heap profile we captured when memory usage reached its peak when running TPC-H Query 21:

image

Most of the native memory was allocated by ZSTD_decompressStream. We cannot see which Java function invoked it since jemalloc cannot unwind JVM stacks, but we can be certain that the call originates from IpcInputStreamIterator by looking into the source code.

Further analysis leads to CometBlockStoreShuffleReader. When there are lots of worker nodes in the cluster, there will also be a large number of ShuffleBlockFetcherIterators. Each iterator defers the closing of a ArrowReaderIterator to task termination, then there will be a lot of unclosed ArrowReaderIterators held in memory and occupy excessive amount of memory. That's why we see lots of live memory allocated by ZSTD_decompressStream.

We can lift currentReadIterator out of the flatMap to decrease the number of unclosed ArrowReaderIterators, so that each CometBlockStoreShuffleReader only leaves one ArrowReaderIterator to be closed on task termination. This will remove the previously identified allocation hotspot and save some memory.

Kontinuation commented 2 months ago

We've significantly reduced the memory usage after applying https://github.com/Kontinuation/datafusion-comet/commit/1674eb974748b019bd29483873e4b5ef64b0d878. Here is the memory usage plot after applying the fix. The peak memory usage has been reduced to 15 GB. We've reduced the amount of native memory usage by nearly 70%.

image

Now the allocation hotspot looks normal, they happen in libcomet:

image

image