apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.22k stars 438 forks source link

[GLUTEN-7860][CORE] In shuffle writer, replace MemoryMappedFile to avoid OOM #7861

Open ccat3z opened 2 weeks ago

ccat3z commented 2 weeks ago

What changes were proposed in this pull request?

This pr fixed https://github.com/apache/incubator-gluten/issues/7860 by MmapFileStream extended arrow:io::InputStream. MmapFileStream will invoke MADV_DONTNEED to release previous memory when read next range of data.

How was this patch tested?

// Generate 10 partitions, each partition has about 10GB random data.
def gen(scale: Int, parts: Int) = {
  sc.parallelize(1 to (1024*1024), numSlices = 1000)
    .map(x => (x % 1000, randStr(scale * parts)))
    .repartition(parts)
    .toDF("a", "b")
    .save./* ... */
}

// Trigger shuffle spill by `repartition(50)`.
def test(parts: Int = 50) = {
  spark.read./* ... */.repartition(parts)
    .filter(expr("a < 0*rand()")) // avoid pushdown repartition
}
# Executor Memory Config
spark.executor.memory=512M
spark.yarn.executor.memoryOverhead=512M
spark.gluten.memory.offHeap.size.in.bytes=1610612736

Test Result:

impl avg time to merge spills (s) avg total spilled size of each task (MB)
read (arrow ReadableFile) 10.58706836156 9935.920098495480
mmap (open required range by MemoryMappedFile) 6.602059312420000 9935.920098495480
madv (this pr) 6.73993204562 9935.920098495480
mmap (repace madv by munmap in this pr) 6.55791399852 9935.920098495480

munmap patch in above test:

diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc
index 1ceb777f1..742c53c90 100644
--- a/cpp/core/shuffle/Utils.cc
+++ b/cpp/core/shuffle/Utils.cc
@@ -243,9 +243,9 @@ void MmapFileStream::advance(int64_t length) {

   auto purgeLength = (pos_ - posRetain_) & pageMask;
   if (purgeLength > 0) {
-    int ret = madvise(data_ + posRetain_, purgeLength, MADV_DONTNEED);
+    int ret = munmap(data_ + posRetain_, purgeLength);
     if (ret != 0) {
-      LOG(WARNING) << "fadvise failed " << ::arrow::internal::ErrnoMessage(errno);
+      LOG(WARNING) << "munmap failed " << ::arrow::internal::ErrnoMessage(errno);
     }
     posRetain_ += purgeLength;
   }
@@ -269,7 +269,7 @@ void MmapFileStream::willNeed(int64_t length) {

 arrow::Status MmapFileStream::Close() {
   if (data_ != nullptr) {
-    int result = munmap(data_, size_);
+    int result = munmap(data_ + posRetain_, size_ - posRetain_);
     if (result != 0) {
       LOG(WARNING) << "munmap failed";
     }
github-actions[bot] commented 2 weeks ago

https://github.com/apache/incubator-gluten/issues/7860

ccat3z commented 2 weeks ago

cc @kecookier

kecookier commented 2 weeks ago

/Benchmark Velox

ccat3z commented 2 weeks ago

/Benchmark Velox

ccat3z commented 2 weeks ago

/Benchmark Velox

marin-ma commented 2 weeks ago

I am triggering a benchmark manually.

@zhztheplayer There's no shuffle spill on jenkins. The change won't be tested.

zhztheplayer commented 2 weeks ago

I am triggering a benchmark manually.

@zhztheplayer There's no shuffle spill on jenkins. The change won't be tested.

Thought we always rely on Spark-controlled spill in shuffle. Does Jenkins CI always have enough memory for all shuffle data?

GlutenPerfBot commented 2 weeks ago

===== Performance report for TPCDS SF2000 with Velox backend, for reference only ====

query log/native_7861_time.csv log/native_master_11_10_2024_be0435a86b_time.csv difference percentage
q1 14.77 14.21 -0.565 96.18%
q2 15.62 15.64 0.019 100.12%
q3 5.30 5.06 -0.234 95.59%
q4 71.79 72.00 0.204 100.28%
q5 10.48 10.67 0.189 101.80%
q6 4.20 3.50 -0.696 83.42%
q7 6.73 8.80 2.070 130.74%
q8 5.60 5.63 0.027 100.47%
q9 26.57 26.21 -0.359 98.65%
q10 8.44 7.68 -0.767 90.91%
q11 37.60 38.28 0.682 101.82%
q12 1.47 1.36 -0.118 92.02%
q13 6.39 7.88 1.491 123.33%
q14a 50.18 50.16 -0.024 99.95%
q14b 41.76 43.28 1.525 103.65%
q15 2.54 2.54 -0.009 99.67%
q16 49.40 49.03 -0.363 99.27%
q17 4.75 5.10 0.351 107.38%
q18 6.88 6.78 -0.106 98.46%
q19 2.10 2.48 0.380 118.13%
q20 1.46 1.37 -0.091 93.79%
q21 2.46 1.13 -1.323 46.10%
q22 7.96 8.12 0.163 102.04%
q23a 109.32 105.75 -3.564 96.74%
q23b 128.14 128.98 0.835 100.65%
q24a 105.00 105.78 0.787 100.75%
q24b 106.35 104.64 -1.707 98.39%
q25 4.97 4.02 -0.954 80.81%
q26 4.27 3.67 -0.598 86.00%
q27 5.01 4.37 -0.639 87.26%
q28 33.28 33.19 -0.094 99.72%
q29 10.63 10.65 0.022 100.20%
q30 4.88 4.67 -0.205 95.79%
q31 6.83 7.16 0.325 104.76%
q32 1.18 1.25 0.065 105.50%
q33 4.28 4.54 0.258 106.03%
q34 3.99 3.91 -0.086 97.85%
q35 8.21 9.88 1.674 120.39%
q36 5.99 6.49 0.506 108.44%
q37 4.95 5.19 0.238 104.80%
q38 13.58 13.80 0.222 101.64%
q39a 3.21 3.14 -0.063 98.03%
q39b 2.92 2.83 -0.096 96.72%
q40 4.28 4.04 -0.249 94.18%
q41 0.65 0.60 -0.048 92.66%
q42 1.90 0.99 -0.905 52.27%
q43 4.72 4.69 -0.025 99.47%
q44 9.78 10.44 0.664 106.80%
q45 3.37 3.33 -0.035 98.95%
q46 3.91 4.03 0.118 103.02%
q47 18.39 18.63 0.238 101.29%
q48 5.14 5.01 -0.134 97.39%
q49 8.33 8.18 -0.156 98.12%
q50 24.22 22.32 -1.900 92.15%
q51 9.85 10.59 0.742 107.53%
q52 1.08 1.14 0.055 105.09%
q53 2.23 2.50 0.271 112.13%
q54 3.83 3.94 0.118 103.09%
q55 1.05 1.12 0.068 106.53%
q56 4.22 4.04 -0.182 95.70%
q57 10.52 10.42 -0.102 99.03%
q58 2.54 2.51 -0.026 98.96%
q59 12.34 11.16 -1.175 90.48%
q60 4.23 4.09 -0.137 96.76%
q61 4.24 4.07 -0.170 95.99%
q62 4.66 4.70 0.040 100.87%
q63 2.43 3.48 1.049 143.18%
q64 61.20 61.17 -0.025 99.96%
q65 17.35 17.01 -0.339 98.05%
q66 4.36 4.28 -0.076 98.27%
q67 428.02 432.65 4.632 101.08%
q68 3.69 3.87 0.173 104.69%
q69 7.63 7.34 -0.284 96.27%
q70 11.76 11.19 -0.571 95.14%
q71 2.46 2.39 -0.074 97.01%
q72 223.01 210.80 -12.210 94.52%
q73 2.52 2.31 -0.213 91.55%
q74 24.32 24.25 -0.066 99.73%
q75 26.47 26.41 -0.059 99.78%
q76 14.61 13.50 -1.114 92.38%
q77 2.10 2.20 0.103 104.89%
q78 50.43 49.87 -0.560 98.89%
q79 3.78 4.03 0.244 106.45%
q80 11.40 11.35 -0.054 99.52%
q81 4.52 4.68 0.160 103.53%
q82 6.94 6.86 -0.080 98.84%
q83 1.57 1.65 0.086 105.46%
q84 2.72 2.93 0.205 107.53%
q85 6.49 6.67 0.178 102.75%
q86 4.09 4.36 0.269 106.57%
q87 14.13 13.57 -0.553 96.09%
q88 18.54 18.35 -0.188 98.99%
q89 3.49 3.07 -0.417 88.03%
q90 3.51 2.91 -0.597 82.99%
q91 1.94 2.00 0.061 103.16%
q92 1.24 1.20 -0.040 96.76%
q93 34.22 40.59 6.366 118.60%
q94 26.25 27.01 0.761 102.90%
q9 90.33 92.59 2.260 102.50%
q5 2.35 2.72 0.370 115.72%
q96 18.00 18.04 0.038 100.21%
q97 1.79 1.99 0.202 111.26%
q98 11.94 10.40 -1.544 87.07%
q99 11.94 10.40 -1.544 87.07%
total 2248.54 2243.07 -5.465 99.76%
FelixYBW commented 1 week ago

@zhztheplayer There's no shuffle spill on jenkins. The change won't be tested.

Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle?

FelixYBW commented 1 week ago

@zhztheplayer @marin-ma can we create a query and config to test it?

ccat3z commented 1 week ago

@FelixYBW @zhztheplayer I added MmapFileStream in this pr. MmapFileStream will invoke MADV_DONTNEED to release previous memory when reading next range of data. Test approach and result has updated in PR description.

FelixYBW commented 1 week ago

Thank you. Looks good solution!

zhztheplayer commented 6 days ago

Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle?

So far the spill will be triggered on components holding more memory no matter it's Velox operator or shuffle. We have a basic priority setting in Spiller API and in future we can extend and use it to implement some fixed spill order.

FelixYBW commented 6 days ago

Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle?

So far the spill will be triggered on components holding more memory no matter it's Velox operator or shuffle. We have a basic priority setting in Spiller API and in future we can extend and use it to implement some fixed spill order.

So now once spill is called, all operator's spill is triggered, right?

zhztheplayer commented 6 days ago

Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle?

So far the spill will be triggered on components holding more memory no matter it's Velox operator or shuffle. We have a basic priority setting in Spiller API and in future we can extend and use it to implement some fixed spill order.

So now once spill is called, all operator's spill is triggered, right?

We pass a target spill size to Velox API so usually the spill call stops when enough memory space is reclaimed. So a portion of the operators can be omitted in the procedure.

FelixYBW commented 5 days ago

We pass a target spill size to Velox API so usually the spill call stops when enough memory space is reclaimed. So a portion of the operators can be omitted in the procedure.

Will it still call shuffle's writer's spill anyway?

FelixYBW commented 2 days ago

can you resolve conflict?

ccat3z commented 1 day ago

can you resolve conflict?

Rebased to latest main.

zhztheplayer commented 1 day ago

We pass a target spill size to Velox API so usually the spill call stops when enough memory space is reclaimed. So a portion of the operators can be omitted in the procedure.

Will it still call shuffle's writer's spill anyway?

yes exactly