Closed andygrove closed 1 month ago
I tried running benchmarks with this PR but ran into:
Failed to allocate additional 917708800 bytes for ShuffleRepartitioner[0] with 0 bytes already allocated for this reservation - 858914816 bytes remain available for the total pool
Perhaps we need to merge https://github.com/apache/datafusion-comet/pull/988 first
@Kontinuation Is this the general approach you were suggesting?
I was able to get benchmarks running by allocating more memory to Comet.
This is better than using a greedy memory pool. It makes spillable operators work correctly under memory pressure, especially when running sort-merge-join where multiple sort operators compete for resources.
There are still some issues remaining unresolved. Each task may create multiple native plans and we still do not make them share the same memory pool. I'd like to share the experiments I've done to better sync with you on this topic.
I did some experiments on my branch to try out various ways of using memory pools. There's a configuration spark.comet.exec.memoryPool
to allow me running queries using various memory pools. All configurations were tested using the query mentioned in https://github.com/apache/datafusion-comet/issues/1003.
spark.comet.exec.memoryPool = greedy
This is the current mode when using native memory management. It could only run with spark.comet.memoryOverhead = 8000m
, otherwise sort-merge-join will fail because of memory reservation failure:
24/10/09 10:59:14 WARN TaskSetManager: Lost task 3.0 in stage 13.0 (TID 43) (bogon executor driver): org.apache.comet.CometNativeException: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge[0] consumed 1164398840 bytes, GroupedHashAggregateStream[0] consumed 117699433 bytes, SMJStream[0] consumed 459160 bytes, HashJoinInput[0] consumed 1392 bytes. Error: Failed to allocate additional 993312 bytes for ExternalSorter[0] with 1321280 bytes already allocated for this reservation - 625495 bytes remain available for the total pool
spark.comet.exec.memoryPool = fair_spill
The same approach as this PR. Simply use FairSpillPool for per-plan memory pool. It could run with spark.comet.memoryOverhead = 3200m
. Both sort operators could spill to cope with the memory bound:
24/10/09 11:03:11 INFO core/src/execution/jni_api.rs: Comet native query plan with metrics (stage: 13 task: 41):
AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1, col_4@4 as col_2, col_3@3 as col_3, col_8@8 as col_4, col_2@2 as col_5, col_5@5 as col_6], aggr=[sum], metrics=[output_rows=2791473, elapsed_compute=3.695235425s]
ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_4@4 as col_3, col_5@5 as col_4, col_6@6 as col_5, col_7@7 as col_6, col_8@8 as col_7, col_1@10 as col_8], metrics=[output_rows=15002382, elapsed_compute=2.23923ms]
ProjectionExec: expr=[col_0@2 as col_0, col_1@3 as col_1, col_2@4 as col_2, col_3@5 as col_3, col_4@6 as col_4, col_5@7 as col_5, col_6@8 as col_6, col_7@9 as col_7, col_8@10 as col_8, col_0@0 as col_0, col_1@1 as col_1], metrics=[output_rows=15002382, elapsed_compute=2.445133ms]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, col_3@3)], metrics=[output_rows=15002382, input_batches=1832, input_rows=15002382, build_input_batches=1, output_batches=1832, build_input_rows=25, build_mem_used=1392, join_time=853.609662ms, build_time=42.125µs]
CopyExec [UnpackOrDeepCopy], metrics=[output_rows=25, elapsed_compute=3.292µs]
ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: Int64, col_1: Utf8], metrics=[output_rows=25, elapsed_compute=709ns, cast_time=1ns]
CopyExec [UnpackOrClone], metrics=[output_rows=15002382, elapsed_compute=1.754617ms]
ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5, col_6@6 as col_6, col_1@9 as col_7, col_2@10 as col_8], metrics=[output_rows=15002382, elapsed_compute=2.032586ms]
SortMergeJoin: join_type=Inner, on=[(col_7@7, col_0@0)], metrics=[output_rows=15002382, spill_count=0, spilled_bytes=0, spilled_rows=0, input_batches=2290, input_rows=18752902, output_batches=1832, peak_mem_used=918320, join_time=4.976020762s]
SortExec: expr=[col_7@7 ASC], preserve_partitioning=[false], metrics=[output_rows=3750520, elapsed_compute=1.678235876s, spill_count=3, spilled_bytes=572203168, spilled_rows=3066232]
CopyExec [UnpackOrDeepCopy], metrics=[output_rows=3750520, elapsed_compute=65.265593ms]
ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Utf8, col_2: Utf8, col_3: Int64, col_4: Utf8, col_5: Decimal128(12, 2), col_6: Utf8, col_7: Int64], metrics=[output_rows=3750520, elapsed_compute=450.456µs, cast_time=1ns]
SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false], metrics=[output_rows=15002382, elapsed_compute=2.424249133s, spill_count=4, spilled_bytes=547164360, spilled_rows=13667085]
CopyExec [UnpackOrDeepCopy], metrics=[output_rows=15002382, elapsed_compute=40.672672ms]
ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Decimal128(12, 2), col_2: Decimal128(12, 2)], metrics=[output_rows=15002382, elapsed_compute=531.627µs, cast_time=1ns]
But each task may create more than one native plans, which has its own memory pool. Here is an example task creating 2 native plans, and these 2 plans are running concurrently.
Plan 1:
24/10/08 13:34:46 INFO core/src/execution/jni_api.rs: Comet native query plan (stage: 13 task: 40):
AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1, col_4@4 as col_2, col_3@3 as col_3, col_8@8 as col_4, col_2@2 as col_5, col_5@5 as col_6], aggr=[sum]
ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_4@4 as col_3, col_5@5 as col_4, col_6@6 as col_5, col_7@7 as col_6, col_8@8 as col_7, col_1@10 as col_8]
ProjectionExec: expr=[col_0@2 as col_0, col_1@3 as col_1, col_2@4 as col_2, col_3@5 as col_3, col_4@6 as col_4, col_5@7 as col_5, col_6@8 as col_6, col_7@9 as col_7, col_8@10 as col_8, col_0@0 as col_0, col_1@1 as col_1]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, col_3@3)]
CopyExec [UnpackOrDeepCopy]
ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: Int64, col_1: Utf8]
CopyExec [UnpackOrClone]
ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5, col_6@6 as col_6, col_1@9 as col_7, col_2@10 as col_8]
SortMergeJoin: join_type=Inner, on=[(col_7@7, col_0@0)]
SortExec: expr=[col_7@7 ASC], preserve_partitioning=[false]
CopyExec [UnpackOrDeepCopy]
ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Utf8, col_2: Utf8, col_3: Int64, col_4: Utf8, col_5: Decimal128(12, 2), col_6: Utf8, col_7: Int64]
SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false]
CopyExec [UnpackOrDeepCopy]
ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Decimal128(12, 2), col_2: Decimal128(12, 2)]
Plan 2:
24/10/08 13:34:52 INFO core/src/execution/jni_api.rs: Comet native query plan (stage: 13 task: 40):
ShuffleWriterExec: partitioning=Hash([Column { name: "col_0", index: 0 }, Column { name: "col_1", index: 1 }, Column { name: "col_2", index: 2 }, Column { name: "col_3", index: 3 }, Column { name: "col_4", index: 4 }, Column { name: "col_5", index: 5 }, Column { name: "col_6", index: 6 }], 4)
ScanExec: source=[], schema=[col_0: Int64, col_1: Utf8, col_2: Decimal128(12, 2), col_3: Utf8, col_4: Utf8, col_5: Utf8, col_6: Utf8, col_7: Decimal128(36, 4), col_8: Boolean]
spark.comet.exec.memoryPool = fair_spill_shared
This approach allocates a FairSpillPool for all plans in the same task. For the above example, the sort-merge-join and the shuffle-write plans in the same task share the same memory pool. This strictly follows the conceptual model that comet won't exceed the spark.comet.memoryOverhead
. It could run with spark.comet.memoryOverhead = 4800m
.
I've added 2 additional JNI interfaces for creating a memory pool at the beginning of each task and releasing the memory pool at the end of each task. Actually this is not necessary. We can create and track the usage of per-task memory pool in the native code, all it needs is the task attempt id at native plan creation time.
spark.comet.exec.memoryPool = fair_spill_global
This approach uses a singleton FairSpillPool for all tasks in the same executor instance. I thought that it should be the optimal approach, but in practice it does not work well. It could only run with spark.comet.memoryOverhead = 12000m
. I'll dive into this issue next week since there are lots of other work allocated for this week :(.
spark.comet.exec.memoryPool = greedy_global
This approach uses a singleton GreedyMemoryPool for all tasks in the same executor instance. As expected, it does not work well. It could only run with spark.comet.memoryOverhead = 9000m
.
So the conclusion is that fair_spill
and fair_spill_shared
have lower memory requirements and are less likely to break when running memory-intensive queries, and I also believe that Spark needs a more sophisticated memory management system from datafusion to support large ETL use cases steadily, which is the use case where Spark shines.
Thanks for the detailed feedback @Kontinuation. I plan to resume work on this today/tomorrow.
@Kontinuation Do you want to create a PR from your branch? I like the idea of having some different configurable options while we are experimenting with this
@Kontinuation Do you want to create a PR from your branch? I like the idea of having some different configurable options while we are experimenting with this
Sure. I'll clean up the code and submit the PR tomorrow.
Closing in favor of https://github.com/apache/datafusion-comet/pull/1021
Which issue does this PR close?
Closes https://github.com/apache/datafusion-comet/issues/996
Possibly depends on https://github.com/apache/datafusion-comet/pull/988 being resolved first
Rationale for this change
This is an alternative to the approach in https://github.com/apache/datafusion-comet/pull/1002
What changes are included in this PR?
How are these changes tested?