apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
823 stars 163 forks source link

chore: Simplify CometShuffleMemoryAllocator to use Spark unified memory allocator #1063

Closed viirya closed 1 week ago

viirya commented 2 weeks ago

Which issue does this PR close?

Closes #1064 Closes #886

Rationale for this change

What changes are included in this PR?

How are these changes tested?

andygrove commented 2 weeks ago

test failure:

 [info] - Spark vectorized reader - with partition data column - select a single complex field from a map entry and its parent map entry *** FAILED *** (653 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 215.0 failed 1 times, most recent failure: Lost task 0.0 in stage 215.0 (TID 370) (4bf8ef4698e6 executor driver): java.lang.IllegalArgumentException: CometShuffleMemoryAllocator should be used with off-heap memory mode, but got ON_HEAP
[info]  at org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.getInstance(CometShuffleMemoryAllocator.java:44)
[info]  at org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.<init>(CometDiskBlockWriter.java:139)
[info]  at org.apache.spark.sql.comet.execution.shuffle.CometBypassMergeSortShuffleWriter.write(CometBypassMergeSortShuffleWriter.java:181)

I think we need to specify spark.memory.offHeap.enabled=true when running Spark tests? I need to do the same in https://github.com/apache/datafusion-comet/pulls

For this PR we should also fall back to Spark for shuffle if spark.memory.offHeap.enabled=false?

viirya commented 2 weeks ago

Basically Spark tests are running with on-heap config, except for tests that particularly for off-heap test.

I'm not sure if enabling off-heap for all Spark tests can pass them all. If it works, let's do it.

If not, I plan to keep and rename current CometShuffleMemoryAllocator to a test-only class CometTestShuffleMemoryAllocator. Once it runs Spark tests, Comet can use CometTestShuffleMemoryAllocator to run Spark tests.

viirya commented 1 week ago

@andygrove All Spark tests are passed now.

andygrove commented 1 week ago

I tried testing with TPC-H but see a memory issue:

│ 24/11/08 02:31:44 INFO core/src/lib.rs: Comet native library version 0.4.0 initialized                                                                                                                        │
│ #                                                                                                                                                                                                             │
│ # A fatal error has been detected by the Java Runtime Environment:                                                                                                                                            │
│ #                                                                                                                                                                                                             │
│ #  SIGSEGV (0xb) at pc=0x00007399e4661564, pid=11, tid=132                                                                                                                                                    │
│ #                                                                                                                                                                                                             │
│ # JRE version: OpenJDK Runtime Environment Temurin-11.0.24+8 (11.0.24+8) (build 11.0.24+8)                                                                                                                    │
│ # Java VM: OpenJDK 64-Bit Server VM Temurin-11.0.24+8 (11.0.24+8, mixed mode, sharing, tiered, compressed oops, g1 gc, linux-amd64)                                                                           │
│ # Problematic frame:                                                                                                                                                                                          │
│ corrupted double-linked list             
andygrove commented 1 week ago

One other issue. I tested with spark.memory.offHeap.enabled=false and the shuffle did not fall back to Spark but failed at runtime.

viirya commented 1 week ago

I tried testing with TPC-H but see a memory issue:

I will test it locally too.

viirya commented 1 week ago

One other issue. I tested with spark.memory.offHeap.enabled=false and the shuffle did not fall back to Spark but failed at runtime.

Yes. If using on-heap config, CometShuffleMemoryAllocator will throw runtime error, i.e., you need to use off-heap config in Spark.

The test only CometTestShuffleMemoryAllocator is only used in Spark tests (as they are used for on-heap mostly).

viirya commented 1 week ago

I tried testing with TPC-H but see a memory issue:

Hmm, I just ran TPC-H with this PR on Spark 3.4 using datafusion-comet script without any error.

andygrove commented 1 week ago

Yes. If using on-heap config, CometShuffleMemoryAllocator will throw runtime error, i.e., you need to use off-heap config in Spark.

Right, so if the user is using on-heap, we should not use Comet shuffle and should fall back to Spark. We probably just need to update isCometShuffleEnabled to check if off-heap is being used.

viirya commented 1 week ago

Yes. If using on-heap config, CometShuffleMemoryAllocator will throw runtime error, i.e., you need to use off-heap config in Spark.

Right, so if the user is using on-heap, we should not use Comet shuffle and should fall back to Spark. We probably just need to update isCometShuffleEnabled to check if off-heap is being used.

Oh, I see. That sounds good. I will update it.

andygrove commented 1 week ago

Hmm, I just ran TPC-H with this PR on Spark 3.4 using datafusion-comet script without any error.

These are the settings that I am using. I am running in k8s.

$SPARK_HOME/bin/spark-submit \
    --master $SPARK_MASTER \
    --conf spark.eventLog.enabled=false \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.driver.memory=8G \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=12g \
    --conf spark.executor.instances=4 \
    --conf spark.executor.memory=30719m \
    --conf spark.executor.cores=6 \
    --conf spark.comet.memory.overhead.factor=0.04 \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.shuffle.enabled=true \
    --conf spark.comet.exec.shuffle.mode=jvm \
viirya commented 1 week ago

Hmm, I just ran TPC-H with this PR on Spark 3.4 using datafusion-comet script without any error.

These are the settings that I am using. I am running in k8s.

$SPARK_HOME/bin/spark-submit \
    --master $SPARK_MASTER \
    --conf spark.eventLog.enabled=false \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.driver.memory=8G \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=12g \
    --conf spark.executor.instances=4 \
    --conf spark.executor.memory=30719m \
    --conf spark.executor.cores=6 \
    --conf spark.comet.memory.overhead.factor=0.04 \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.shuffle.enabled=true \
    --conf spark.comet.exec.shuffle.mode=jvm \

This is what I used to run:

$SPARK_HOME/bin/spark-submit \
    --master "local[*]" \
    --jars $COMET_JAR \
    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR \
    --conf spark.plugins=org.apache.spark.CometPlugin --conf spark.driver.memory=8G --conf spark.executor.memory=10G --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=16G \
    --conf spark.comet.enabled=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.cast.allowIncompatible=true \
    --conf spark.comet.exec.shuffle.enabled=true \
    --conf spark.comet.exec.shuffle.mode=jvm \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --benchmark tpch
    ...
viirya commented 1 week ago

I don't set spark.comet.memory.overhead.factor. Do you need it?

andygrove commented 1 week ago

I don't set spark.comet.memory.overhead.factor. Do you need it?

This is the value from https://github.com/apache/datafusion-comet/issues/886, which I think this PR is intended to close.

I ran a clean build this morning and did not see the segfault, so it is possible that I picked up an old docker image ... I will continue testing this morning.

andygrove commented 1 week ago

Actually, this PR won't close #886 because this is still using a singleton, so let's ignore that for now.

This PR LGTM and I will approve after some more testing.

viirya commented 1 week ago

It fallbacks to Spark shuffle now if off-heap is not enabled.

codecov-commenter commented 1 week ago

Codecov Report

Attention: Patch coverage is 59.09091% with 36 lines in your changes missing coverage. Please review.

Project coverage is 34.19%. Comparing base (845b654) to head (e7e7847). Report is 13 commits behind head on main.

Files with missing lines Patch % Lines
...shuffle/comet/CometTestShuffleMemoryAllocator.java 68.96% 9 Missing and 9 partials :warning:
...ark/shuffle/comet/CometShuffleMemoryAllocator.java 30.76% 8 Missing and 1 partial :warning:
.../comet/execution/shuffle/CometDiskBlockWriter.java 0.00% 4 Missing :warning:
...org/apache/comet/CometSparkSessionExtensions.scala 20.00% 0 Missing and 4 partials :warning:
...spark/sql/comet/execution/shuffle/SpillWriter.java 50.00% 1 Missing :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #1063 +/- ## ============================================ - Coverage 34.46% 34.19% -0.28% + Complexity 888 884 -4 ============================================ Files 113 115 +2 Lines 43580 42765 -815 Branches 9658 9346 -312 ============================================ - Hits 15021 14622 -399 + Misses 25507 25279 -228 + Partials 3052 2864 -188 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

viirya commented 1 week ago

Actually, this PR won't close #886 because this is still using a singleton, so let's ignore that for now.

As now the allocator uses all available memory on the executor (we don't specify memory size on the allocator), it should not be an issue for #886 now. @andygrove Do you want to re-check if #886 can be fixed by this PR too? Thanks.

And similar to TaskMemoryManager, I think it makes more sense to have a singleton of memory allocator for shuffle writers in same executor.

andygrove commented 1 week ago

Can we make COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE and COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR internal configs now, since they are only used in tests now?

viirya commented 1 week ago

Can we make COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE and COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR internal configs now, since they are only used in tests now?

Yes. They should be internal configs now. Let me update it now.

andygrove commented 1 week ago

As now the allocator uses all available memory on the executor (we don't specify memory size on the allocator), it should not be an issue for #886 now. @andygrove Do you want to re-check if #886 can be fixed by this PR too? Thanks.

I will test this again today.

andygrove commented 1 week ago

I'm running into SIGSEGV issues again.

│ # A fatal error has been detected by the Java Runtime Environment:                                                                                                                                      │
│ #                                                                                                                                                                                                       │
│ #  SIGSEGV (0xb) at pc=0x000072e2c93b6bc8, pid=11, tid=127                                                                                                                                              │
│ #                                                                                                                                                                                                       │
│ # JRE version: OpenJDK Runtime Environment Temurin-11.0.24+8 (11.0.24+8) (build 11.0.24+8)                                                                                                              │
│ # Java VM: OpenJDK 64-Bit Server VM Temurin-11.0.24+8 (11.0.24+8, mixed mode, sharing, tiered, compressed oops, g1 gc, linux-amd64)                                                                     │
│ # Problematic frame:                                                                                                                                                                                    │
│ # C  [libcomet-14210005976568904946.so+0x736bc8]  comet::execution::shuffle::row::append_columns::h9b53b563e484a30e+0x1318                                                                              │
│ #      

I will try running the same benchmark on main.

edit: I cannot reproduce on main because it fails there with

Caused by: org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 67108864 bytes of memory, got 39485440 bytes. Available: 39485440                                                           │
andygrove commented 1 week ago

I increased the off-heap pool size, and now I can run TPC-H q5 @ sf=1TB on the main branch, but get SIGSEGV with this PR.

viirya commented 1 week ago

Let me see if I can reproduce it.

viirya commented 1 week ago

Ah, I figured out what was wrong there. I updated this with the change.

I ran the benchmarks locally and didn't see the error.

Please also run the benchmarks to verify it fixes the error. Thanks. @andygrove

viirya commented 1 week ago

Cool. Thanks @andygrove for verifying it.