apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.22k stars 439 forks source link

[VL] fallback sort spill #6527

Closed FelixYBW closed 3 months ago

FelixYBW commented 4 months ago

Backend

VL (Velox)

Bug description

the query plan:

Execute InsertIntoHadoopFsRelationCommand (21)
+- FakeRowAdaptor (20)
   +- AdaptiveSparkPlan (19)
      +- == Current Plan ==
         ObjectHashAggregate (13)
         +- ShuffleQueryStage (12)
            +- ColumnarExchange (11)
               +- ^ ProjectExecTransformer (9)
                  +- ^ InputIteratorTransformer (8)
                     +- ^ InputAdapter (7)
                        +- ^ RowToVeloxColumnar (6)
                           +- ^ ObjectHashAggregate (5)
                              +- ^ VeloxColumnarToRowExec (4)
                                 +- ^ ProjectExecTransformer (2)
                                    +- ^ Scan parquet default.api_experiment_request_joins_v1 (1)

looks like there is an external sort in operator ObjectHashAggregate.

without PR6480, it reports OOM error:

org.apache.gluten.exception.GlutenException: java.lang.RuntimeException: Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: Operator::getOutput failed for [operator: TableScan, plan node ID: 0]: Error during calling Java code from native code: org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget$OutOfMemoryException: Not enough spark off-heap execution memory. Acquired: 8.0 MiB, granted: 0.0 B. Try tweaking config option spark.memory.offHeap.size to get larger space to run this application. 
Current config settings: 
    spark.gluten.memory.offHeap.size.in.bytes=40.0 GiB
    spark.gluten.memory.task.offHeap.size.in.bytes=10.0 GiB
    spark.gluten.memory.conservative.task.offHeap.size.in.bytes=5.0 GiB
Memory consumer stats: 
    Task.1744:                                                                     Current used bytes:   10.0 GiB, peak bytes:        N/A
    +- org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@265f7d0c: Current used bytes:    9.8 GiB, peak bytes:        N/A
    +- Gluten.Tree.4:                                                              Current used bytes:  192.0 MiB, peak bytes:  260.0 MiB
    |  \- root.4:                                                                  Current used bytes:  192.0 MiB, peak bytes:  260.0 MiB
    |     +- WholeStageIterator.4:                                                 Current used bytes:  184.0 MiB, peak bytes:  192.0 MiB
    |     |  \- single:                                                            Current used bytes:  184.0 MiB, peak bytes:  192.0 MiB
    |     |     +- task.Gluten_Stage_0_TID_1744:                                   Current used bytes:  184.0 MiB, peak bytes:  192.0 MiB
    |     |     |  +- node.0:                                                      Current used bytes:  184.0 MiB, peak bytes:  192.0 MiB
    |     |     |  |  +- op.0.0.0.TableScan:                                       Current used bytes:  184.0 MiB, peak bytes:  184.2 MiB
    |     |     |  |  \- op.0.0.0.TableScan.test-hive:                             Current used bytes:      0.0 B, peak bytes:      0.0 B
    |     |     |  \- node.1:                                                      Current used bytes:      0.0 B, peak bytes:      0.0 B
    |     |     |     \- op.1.0.0.FilterProject:                                   Current used bytes:      0.0 B, peak bytes:      0.0 B
    |     |     \- WholeStageIterator_default_leaf:                                Current used bytes:      0.0 B, peak bytes:      0.0 B
    |     +- ColumnarToRow.4:                                                      Current used bytes:    8.0 MiB, peak bytes:    8.0 MiB
    |     |  \- single:                                                            Current used bytes: 1024.0 KiB, peak bytes: 1024.0 KiB
    |     |     \- ColumnarToRow_default_leaf:                                     Current used bytes:  512.0 KiB, peak bytes:  512.0 KiB
    |     +- OverAcquire.DummyTarget.8:                                            Current used bytes:      0.0 B, peak bytes:   57.6 MiB
    |     \- OverAcquire.DummyTarget.9:                                            Current used bytes:      0.0 B, peak bytes:    2.4 MiB
    \- org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@281f1779: Current used bytes:      0.0 B, peak bytes:        N/A

with PR6480, the error becomes:

4/07/20 04:21:35 ERROR [Thread-18] listener.ManagedReservationListener: Error reserving memory from target
java.lang.IllegalStateException: Number of records written exceeded numRecordsToWrite = 31956470
    at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:118)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spillIterator(UnsafeExternalSorter.java:623)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.forceSpill(UnsafeExternalSorter.java:234)
    at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:176)
    at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:125)
    at org.apache.spark.memory.MemoryConsumer.acquireMemory(MemoryConsumer.java:127)
    at org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumer.borrow(TreeMemoryConsumer.java:65)
    at org.apache.gluten.memory.memtarget.TreeMemoryTargets$Node.borrow0(TreeMemoryTargets.java:126)
    at org.apache.gluten.memory.memtarget.TreeMemoryTargets$Node.borrow(TreeMemoryTargets.java:118)
    at org.apache.gluten.memory.memtarget.TreeMemoryTargets$Node.borrow0(TreeMemoryTargets.java:126)
    at org.apache.gluten.memory.memtarget.TreeMemoryTargets$Node.borrow(TreeMemoryTargets.java:118)
    at org.apache.gluten.memory.memtarget.OverAcquire.borrow(OverAcquire.java:62)
    at org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget.borrow(ThrowOnOomMemoryTarget.java:35)
    at org.apache.gluten.memory.listener.ManagedReservationListener.reserve(ManagedReservationListener.java:43)

Looks PR6480 also triggered the sort in the external sort, but it has error.

@jinchengchenghh @zhztheplayer

Spark version

Spark-3.2.x

Spark configurations

No response

System information

Velox System Info v0.0.2 Commit: 34dbec25d204fcb302893429350d37081feb5edf CMake Version: 3.29.4 System: Linux-5.4.0-1063-aws Arch: x86_64 CPU Name: Model name: Intel(R) Xeon(R) Platinum 8488C C++ Compiler: /usr/bin/c++ C++ Compiler Version: 9.4.0 C Compiler: /usr/bin/cc C Compiler Version: 9.4.0 CMake Prefix Path: /usr/local;/usr;/;/usr;/usr/local;/usr/X11R6;/usr/pkg;/opt

Relevant logs

No response

jinchengchenghh commented 4 months ago

Does PR6480 can fix some spill issues? Or just error changed?

Yohahaha commented 4 months ago

seems same as https://github.com/apache/incubator-gluten/issues/4275

jinchengchenghh commented 4 months ago

It failed here

final UnsafeSorterSpillWriter spillWriter =
      new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics,
        inMemSorter.numRecords());
    spillWriters.add(spillWriter);
    spillIterator(inMemSorter.getSortedIterator(), spillWriter);

numRecords is inMemSorter.numRecords() (pos / 2), numRecordToWrite is inMemSorter.getSortedIterator().length(), it is in function UnsafeSorterIterator.getSortedIterator()

with null

queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
UnsafeExternalSorter.ChainedIterator(queue)

without null

return new SortedIterator(pos / 2, offset)

nullBoundaryPos and pos are even.

So it should always spill all the records in inMemSorter, I don't known why the error throws. Does the customer change the Spark logic?

FelixYBW commented 4 months ago

Does PR6480 can fix some spill issues? Or just error changed?

PR6480 does solve the parquet write spill issues. With this query, the external sort isn't called by parquet writer, but looks by the ObjectHashAggregate. With or without PR6480, the error message is different.

FelixYBW commented 3 months ago

6683

boneanxs commented 3 weeks ago

If not using pr6480, is it possible we can set spark.shuffle.spill.numElementsForceSpillThreshold to force spill to bypass this issue?

jinchengchenghh commented 3 weeks ago

Yes, this is a common solution, for specific query, you can change this config to a reasonable value to bypass this issue. @boneanxs

boneanxs commented 3 weeks ago

But why pure spark won't have such memory issue? Since 2 different ExternalSorter can't spill each other before this pr in spark, it should be the same with the gluten sorter that can't call another spark sorter to spill.

Sorry, I must miss something here, can you please elaborate more? @FelixYBW @jinchengchenghh

FelixYBW commented 3 weeks ago

But why pure spark won't have such memory issue? Since 2 different ExternalSorter can't spill each other before this pr in spark, it should be the same with the gluten sorter that can't call another spark sorter to spill.

Sorry, I must miss something here, can you please elaborate more? @FelixYBW @jinchengchenghh

6683 is the summary