apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.15k stars 416 forks source link

[VL] Optimize shuffle writer memory usage #3444

Open Yohahaha opened 11 months ago

Yohahaha commented 11 months ago

Description

A simple task with scan + filter + shuffle writer has 45MB of peek memory usage, assume scan and filter belong to computation part, shuffle writer belong to shuffle part, after add detailed log, we find computation part consume 25MB, shuffle part consume 20MB.

I'm not start but I think shuffle part's memory usage could be optimized.

image

set spark.gluten.memory.reservationBlockSize=1MB to minimize reservation.

Yohahaha commented 11 months ago
I1019 16:22:44.724664 4060396 WholeStageResultIterator.cc:216] -- Project[expressions: (n2_3:INTEGER, hash_with_seed(42,"n1_4")), (n2_4:BIGINT, "n1_4"), (n2_5:DECIMAL(12, 2), "n1_5"), (n2_6:DECIMAL(12, 2), "n1_6")] -> n2_3:INTEGER, n2_4:BIGINT, n2_5:DECIMAL(12, 2), n2_6:DECIMAL(12, 2)
   Output: 492733 rows (71.34MB, 489 batches), Cpu time: 8.56ms, Blocked wall time: 0ns, Peak memory: 48.00KB, Memory allocations: 1958, Threads: 1
      queuedWallNanos              sum: 256.00us, count: 1, min: 256.00us, max: 256.00us
      runningAddInputWallNanos     sum: 513.76us, count: 1, min: 513.76us, max: 513.76us
      runningFinishWallNanos       sum: 0ns, count: 1, min: 0ns, max: 0ns
      runningGetOutputWallNanos    sum: 7.70ms, count: 1, min: 7.70ms, max: 7.70ms
  -- Project[expressions: (n1_4:BIGINT, "n0_0"), (n1_5:DECIMAL(12, 2), "n0_1"), (n1_6:DECIMAL(12, 2), "n0_2")] -> n1_4:BIGINT, n1_5:DECIMAL(12, 2), n1_6:DECIMAL(12, 2)
     Output: 492733 rows (68.53MB, 489 batches), Cpu time: 1.93ms, Blocked wall time: 0ns, Peak memory: 0B, Memory allocations: 0, Threads: 1
        runningAddInputWallNanos     sum: 218.79us, count: 1, min: 218.79us, max: 218.79us
        runningFinishWallNanos       sum: 191ns, count: 1, min: 191ns, max: 191ns
        runningGetOutputWallNanos    sum: 1.35ms, count: 1, min: 1.35ms, max: 1.35ms
    -- TableScan[table: hive_table, range filters: [(l_orderkey, Filter(IsNotNull, deterministic, null not allowed)), (l_returnflag, BytesRange: [R, R] no nulls)]] -> n0_0:BIGINT, n0_1:DECIMAL(12, 2), n0_2:DECIMAL(12, 2), n0_3:VARCHAR
       Input: 492733 rows (88.58MB, 489 batches), Raw Input: 2000000 rows (19.43MB), Output: 492733 rows (88.58MB, 489 batches), Cpu time: 116.55ms, Blocked wall time: 0ns, Peak memory: 22.70MB, Memory allocations: 4904, Threads: 1, Splits: 1
          dataSourceWallNanos              sum: 100.33ms, count: 1, min: 100.33ms, max: 100.33ms
          flattenStringDictionaryValues    sum: 0, count: 1, min: 0, max: 0
          ioWaitNanos                      sum: 8.29ms, count: 1, min: 8.29ms, max: 8.29ms
          localReadBytes                   sum: 0B, count: 1, min: 0B, max: 0B
          numLocalRead                     sum: 0, count: 1, min: 0, max: 0
          numPrefetch                      sum: 0, count: 1, min: 0, max: 0
          numRamRead                       sum: 0, count: 1, min: 0, max: 0
          numStorageRead                   sum: 2, count: 1, min: 2, max: 2
          overreadBytes                    sum: 984.72KB, count: 1, min: 984.72KB, max: 984.72KB
          prefetchBytes                    sum: 0B, count: 1, min: 0B, max: 0B
          queryThreadIoLatency             sum: 2, count: 1, min: 2, max: 2
          ramReadBytes                     sum: 0B, count: 1, min: 0B, max: 0B
          runningAddInputWallNanos         sum: 0ns, count: 1, min: 0ns, max: 0ns
          runningFinishWallNanos           sum: 510ns, count: 1, min: 510ns, max: 510ns
          runningGetOutputWallNanos        sum: 125.46ms, count: 1, min: 125.46ms, max: 125.46ms
          skippedSplitBytes                sum: 0B, count: 1, min: 0B, max: 0B
          skippedSplits                    sum: 0, count: 1, min: 0, max: 0
          skippedStrides                   sum: 0, count: 1, min: 0, max: 0
          storageReadBytes                 sum: 18.43MB, count: 1, min: 18.43MB, max: 18.43MB
          totalScanTime                    sum: 14.09ms, count: 1, min: 14.09ms, max: 14.09ms
23/10/19 16:22:44 WARN [Executor task launch worker for task 0.0 in stage 0.0 (TID 0)] NativeMemoryManager: ShuffleWriter peek memory usage 20.0 MiB
I1019 16:22:44.780633 4060396 VeloxMemoryManager.h:36] ShuffleWriter peek usage 20786880
23/10/19 16:22:44 WARN [Executor task launch worker for task 0.0 in stage 0.0 (TID 0)] NativeMemoryManager: WholeStageIterator peek memory usage 25.0 MiB
I1019 16:22:44.781193 4060396 VeloxMemoryManager.h:36] WholeStageIterator peek usage 26214400
zhouyuan commented 11 months ago

CC: @marin-ma

marin-ma commented 11 months ago

Could you try lowering the spark.gluten.shuffleWriter.bufferSize? The default value is 4096.

Yohahaha commented 11 months ago

Could you try lowering the spark.gluten.shuffleWriter.bufferSize? The default value is 4096.

Yes, shuffle part peek memory decreased from 20MB to 8MB by adjust above value to 1024.

Could you share more about this config? tuning this config will affect performance?

marin-ma commented 10 months ago

@Yohahaha We haven't tested performance around adjusting this configuration. For simple SQL queries, you can check the "avg read batch num rows" metrics in ColumnarShuffle op from the SQL tab in spark UI, and set this value smaller by referencing the metric value. But for complex SQL queries, this value can vary among different shuffle stages, therefore the configuration is hard to decide, and we suggest using the default value 4096.

By the way, the "Peak memory allocation" here only indicates the virtual memory allocation, not the actually used physical memory.