apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
447 stars 100 forks source link

Detected memory leak on Comet columnar shuffle when AQE coalesce partitions enabled #381

Closed viirya closed 1 week ago

viirya commented 1 week ago

Describe the bug

There are a few test failures caused by memory leak reported by Java Arrow. They are found in #250 after enabling columnar shuffle by default on Spark SQL tests. For example,

In AdaptiveQueryExecSuite:

[info] - SPARK-35455: Unify empty relation optimization between normal and AQE optimizer - single join *** FAILED *** (3 seconds, 170 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 729.0 failed 1 times, most recent failure: Lost task 0.0 in stage 729.0 (TID 1631) (e2b4fe719fb3 executor driver): org.apache.comet.CometNativeException: java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (32)
[info] Allocator(StreamReader/CometBlockStoreShuffleReader) 0/32/32/9223372036854775807 (res/actual/peak/limit)
[info] 
[info]  at org.apache.comet.Native.executePlan(Native Method)
[info]  at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:71)
[info]  at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:123)
[info]  at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:138)

After debugging these failures, seems it is triggered if AQE coalesce partitions enabled.

I think it is because when coalesce partition is enabled, some partitions (of multiple reducers) are combined together, which causing incorrect format to read at Arrow StreamReader.

For now, we should disable Comet columnar shuffle if AQE coalesce partitions enabled.

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response