apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.22k stars 439 forks source link

[GLUTEN-4263][VL] Compression type 2 not supported #4263

Open rhh777 opened 11 months ago

rhh777 commented 11 months ago

Backend

VL (Velox)

Bug description

When running TPC DS query67, a large number of tasks failed. The reasons for failure are Compression type 2 not supported and OutOfMemoryException.

What compression type is type 2?

I tried two ways to run successfully。

  1. Increase offHeap memory(15g->20g) and the task can run successfully.

  2. Modify spark.io.compression.codec from the default lz4 to zstd. Then modify spark.gluten.sql.columnar.backend.velox.orderBySpillMemoryThreshold from the default value 0 to 256MB. After modifying these two configurations, the task can run successfully and the speed is improved.

Spark version

Spark-3.3.x

Spark configurations

spark-sql \ --master yarn \ --deploy-mode client \ --driver-cores 5 \ --driver-memory 5G \ --executor-cores 5 \ --executor-memory 4G \ --num-executors 10 \ --conf spark.executor.memoryOverhead=1g \ --conf spark.plugins=io.glutenproject.GlutenPlugin \ --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager \ --conf spark.memory.offHeap.enabled=true \ --conf spark.memory.offHeap.size=15G \ --class org.apache.kyuubi.tpcds.benchmark.RunBenchmark \ --conf spark.executorEnv.JAVA_HOME=/usr/local/jdk1.8.0_382 \ --conf spark.yarn.appMasterEnv.JAVA_HOME=/usr/local/jdk1.8.0_382 \ --files /data/hrh/tpcds/hdfs-client.xml \ --conf spark.sql.autoBroadcastJoinThreshold=10MB

error log

Caused by: java.lang.RuntimeException: Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: Operator::addInput failed for [operator: Window, plan node ID: 2]: Error during calling Java code from native code: io.glutenproject.memory.memtarget.ThrowOnOomMemoryTarget$OutOfMemoryException: Not enough spark off-heap execution memory. Acquired: 8388608, granted: 0. Try tweaking config option spark.memory.offHeap.size to get larger space to run this application. 
Current config settings: 
    spark.gluten.memory.offHeap.size.in.bytes=15.0 GiB
    spark.gluten.memory.task.offHeap.size.in.bytes=3.0 GiB
    spark.gluten.memory.conservative.task.offHeap.size.in.bytes=1536.0 MiB
Memory consumer stats: 
    Task.2325:                                      Current used bytes: 8.1 GiB, peak bytes:        N/A
    \- Gluten.Tree.50:                              Current used bytes: 8.1 GiB, peak bytes:   10.5 GiB
       \- root.50:                                  Current used bytes: 8.1 GiB, peak bytes:   10.5 GiB
          +- WholeStageIterator.50:                 Current used bytes: 8.1 GiB, peak bytes:    8.1 GiB
          |  \- single:                             Current used bytes: 8.1 GiB, peak bytes:    8.1 GiB
          |     +- task.Gluten_Stage_9_TID_2325:    Current used bytes: 8.1 GiB, peak bytes:    8.1 GiB
          |     |  +- node.1:                       Current used bytes: 8.1 GiB, peak bytes:    8.1 GiB
          |     |  |  \- op.1.0.0.OrderBy:          Current used bytes: 6.5 GiB, peak bytes:    6.5 GiB
          |     |  +- node.2:                       Current used bytes: 7.0 MiB, peak bytes:    7.0 MiB
          |     |  |  \- op.2.0.0.Window:           Current used bytes: 6.6 MiB, peak bytes:    6.6 MiB
          |     |  +- node.4:                       Current used bytes:   0.0 B, peak bytes:      0.0 B
          |     |  |  \- op.4.0.0.TopN:             Current used bytes:   0.0 B, peak bytes:      0.0 B
          |     |  +- node.0:                       Current used bytes:   0.0 B, peak bytes:      0.0 B
          |     |  |  \- op.0.0.0.ValueStream:      Current used bytes:   0.0 B, peak bytes:      0.0 B
          |     |  \- node.3:                       Current used bytes:   0.0 B, peak bytes:      0.0 B
          |     |     \- op.3.0.0.FilterProject:    Current used bytes:   0.0 B, peak bytes:      0.0 B
          |     \- WholeStageIterator_default_leaf: Current used bytes:   0.0 B, peak bytes:      0.0 B
          +- ArrowContextInstance.49:               Current used bytes: 8.0 MiB, peak bytes:    8.0 MiB
          +- OverAcquire.DummyTarget.150:           Current used bytes:   0.0 B, peak bytes:    2.4 MiB
          +- ShuffleReader.20:                      Current used bytes:   0.0 B, peak bytes:    8.0 MiB
          |  \- single:                             Current used bytes:   0.0 B, peak bytes: 1024.0 KiB
          |     \- ShuffleReader_default_leaf:      Current used bytes:   0.0 B, peak bytes:  480.0 KiB
          \- OverAcquire.DummyTarget.149:           Current used bytes:   0.0 B, peak bytes:    2.4 GiB

    at io.glutenproject.memory.memtarget.ThrowOnOomMemoryTarget.borrow(ThrowOnOomMemoryTarget.java:90)
    at io.glutenproject.memory.nmm.ManagedReservationListener.reserve(ManagedReservationListener.java:43)
    at io.glutenproject.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native Method)
    at io.glutenproject.vectorized.ColumnarBatchOutIterator.hasNextInternal(ColumnarBatchOutIterator.java:65)
    at io.glutenproject.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:37)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
    at io.glutenproject.utils.IteratorCompleter.hasNext(Iterators.scala:69)
    at io.glutenproject.utils.PayloadCloser.hasNext(Iterators.scala:35)
    at io.glutenproject.utils.PipelineTimeAccumulator.hasNext(Iterators.scala:98)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.ColumnarShuffleWriter.internalWrite(ColumnarShuffleWriter.scala:102)
    at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:218)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Retriable: False
Function: runInternal
File: /root/src/oap-project/gluten/ep/build-velox/build/velox_ep/velox/exec/Driver.cpp
Line: 588
Stack trace:
# 0  _ZN8facebook5velox7process10StackTraceC1Ei
# 1  _ZN8facebook5velox14VeloxExceptionC1EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_
# 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_17VeloxRuntimeErrorERKSsEEvRKNS1_18VeloxCheckFailArgsET0_
# 3  _ZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEE.cold
# 4  _ZN8facebook5velox4exec6Driver4nextERSt10shared_ptrINS1_13BlockingStateEE
# 5  _ZN8facebook5velox4exec4Task4nextEPN5folly10SemiFutureINS3_4UnitEEE
# 6  _ZN6gluten24WholeStageResultIterator4nextEv
# 7  Java_io_glutenproject_vectorized_ColumnarBatchOutIterator_nativeHasNext
# 8  0x00007fb9d60c5a68

    at io.glutenproject.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native Method)
    at io.glutenproject.vectorized.ColumnarBatchOutIterator.hasNextInternal(ColumnarBatchOutIterator.java:65)
    at io.glutenproject.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:37)
    ... 17 more
23/12/13 15:52:34 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 2338
23/12/13 15:52:34 INFO Executor: Running task 1.1 in stage 9.0 (TID 2338)
23/12/13 15:52:34 INFO ShuffleBlockFetcherIterator: Getting 200 (1288.9 MiB) non-empty blocks including 20 (128.9 MiB) local and 20 (128.9 MiB) host-local and 0 (0.0 B) push-merged-local and 160 (1031.1 MiB) remote blocks
23/12/13 15:52:34 INFO ShuffleBlockFetcherIterator: Started 3 remote fetches in 1 ms
23/12/13 15:53:05 ERROR ManagedReservationListener: Error reserving memory from target
java.lang.RuntimeException: Compression type 2 not supported
    at io.glutenproject.vectorized.ColumnarBatchOutIterator.nativeSpill(Native Method)
    at io.glutenproject.vectorized.ColumnarBatchOutIterator.spill(ColumnarBatchOutIterator.java:83)
    at io.glutenproject.vectorized.NativePlanEvaluator.lambda$createKernelWithBatchIterator$1(NativePlanEvaluator.java:75)
    at io.glutenproject.memory.memtarget.Spillers$WithMinSpillSize.spill(Spillers.java:54)
    at io.glutenproject.memory.memtarget.Spillers.lambda$withOrder$0(Spillers.java:30)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets.spillTree(TreeMemoryTargets.java:66)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets.spillTree(TreeMemoryTargets.java:60)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets.spillTree(TreeMemoryTargets.java:60)
    at io.glutenproject.memory.memtarget.spark.TreeMemoryConsumer.spill(TreeMemoryConsumer.java:119)
    at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:221)
    at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)
    at org.apache.spark.memory.MemoryConsumer.acquireMemory(MemoryConsumer.java:136)
    at io.glutenproject.memory.memtarget.spark.TreeMemoryConsumer.borrow(TreeMemoryConsumer.java:69)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets$Node.borrow0(TreeMemoryTargets.java:112)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets$Node.borrow(TreeMemoryTargets.java:104)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets$Node.borrow0(TreeMemoryTargets.java:112)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets$Node.borrow(TreeMemoryTargets.java:104)
    at io.glutenproject.memory.memtarget.OverAcquire.borrow(OverAcquire.java:56)
    at io.glutenproject.memory.memtarget.ThrowOnOomMemoryTarget.borrow(ThrowOnOomMemoryTarget.java:35)
    at io.glutenproject.memory.nmm.ManagedReservationListener.reserve(ManagedReservationListener.java:43)
    at io.glutenproject.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native Method)
    at io.glutenproject.vectorized.ColumnarBatchOutIterator.hasNextInternal(ColumnarBatchOutIterator.java:65)
    at io.glutenproject.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:37)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
    at io.glutenproject.utils.IteratorCompleter.hasNext(Iterators.scala:69)
    at io.glutenproject.utils.PayloadCloser.hasNext(Iterators.scala:35)
    at io.glutenproject.utils.PipelineTimeAccumulator.hasNext(Iterators.scala:98)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.ColumnarShuffleWriter.internalWrite(ColumnarShuffleWriter.scala:102)
    at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:218)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
23/12/13 15:53:05 ERROR TaskResources: Task 2337 failed by error: 
io.glutenproject.exception.GlutenException: java.lang.RuntimeException: Exception: VeloxRuntimeError
zhouyuan commented 11 months ago

@rhh777 thanks for reporting, the stack is clear, this is a bug on orderby spill w/ compression enabled CC @zhztheplayer

thanks, -yuan

FelixYBW commented 10 months ago

@zhztheplayer Is the issue fixed?

loukey-lj commented 9 months ago

@zhouyuan @zhztheplayer Is the issue fixed?

sagarlakshmipathy commented 8 months ago

I'm seeing this on Q95

24/03/14 22:20:30 ERROR YarnScheduler: Lost executor 7 on ip-10-0-105-199.us-west-2.compute.internal: Container from a bad node: container_1710430554799_0011_01_000013 on host: ip-10-0-105-199.us-west-2.compute.internal. Exit status: 134. Diagnostics: /03/14 22:20:17 INFO MemoryStore: Block broadcast_2446 stored as values in memory (estimated size 15.6 KiB, free 41.1 GiB)
E20240314 22:20:29.005700 32189 HashBuild.cpp:1146] Spill from hash build pool op.20.3.0.HashBuild failed: Compression type 2 not supported
24/03/14 22:20:29 ERROR ManagedReservationListener: Error reserving memory from target
java.lang.RuntimeException: Compression type 2 not supported
    at io.glutenproject.vectorized.ColumnarBatchOutIterator.nativeSpill(Native Method)
    at io.glutenproject.vectorized.ColumnarBatchOutIterator.spill(ColumnarBatchOutIterator.java:83)
    at io.glutenproject.vectorized.NativePlanEvaluator.lambda$createKernelWithBatchIterator$1(NativePlanEvaluator.java:75)
    at io.glutenproject.memory.memtarget.Spillers$WithMinSpillSize.spill(Spillers.java:54)
    at io.glutenproject.memory.memtarget.Spillers.lambda$withOrder$0(Spillers.java:30)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets.spillTree(TreeMemoryTargets.java:66)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets.spillTree(TreeMemoryTargets.java:60)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets.spillTree(TreeMemoryTargets.java:60)
    at io.glutenproject.memory.memtarget.spark.TreeMemoryConsumer.spill(TreeMemoryConsumer.java:119)
    at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:221)
    at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)
    at org.apache.spark.memory.MemoryConsumer.acquireMemory(MemoryConsumer.java:137)
    at io.glutenproject.memory.memtarget.spark.TreeMemoryConsumer.borrow(TreeMemoryConsumer.java:69)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets$Node.borrow0(TreeMemoryTargets.java:112)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets$Node.borrow(TreeMemoryTargets.java:104)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets$Node.borrow0(TreeMemoryTargets.java:112)
    at io.glutenproject.memory.memtarget.TreeMemoryTargets$Node.borrow(TreeMemoryTargets.java:104)
    at io.glutenproject.memory.memtarget.OverAcquire.borrow(OverAcquire.java:56)
    at io.glutenproject.memory.memtarget.ThrowOnOomMemoryTarget.borrow(ThrowOnOomMemoryTarget.java:35)
    at io.glutenproject.memory.nmm.ManagedReservationListener.reserve(ManagedReservationListener.java:43)
    at io.glutenproject.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native Method)
    at io.glutenproject.vectorized.ColumnarBatchOutIterator.hasNextInternal(ColumnarBatchOutIterator.java:65)
    at io.glutenproject.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:37)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
    at io.glutenproject.utils.IteratorCompleter.hasNext(Iterators.scala:69)
    at io.glutenproject.utils.PayloadCloser.hasNext(Iterators.scala:35)
    at io.glutenproject.utils.PipelineTimeAccumulator.hasNext(Iterators.scala:98)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.ColumnarShuffleWriter.internalWrite(ColumnarShuffleWriter.scala:102)
    at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:218)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Rest ran fine

scala> [hadoop@ip-10-0-95-218 ~]$ ./spark-3.4.1-bin-hadoop3/bin/spark-shell --master yarn \
> --deploy-mode client \
>     --driver-memory 42g     \
>     --executor-memory 48g       \
>     --executor-cores 15     \
>     --num-executors 10     \
> --conf  spark.plugins=io.glutenproject.GlutenPlugin \
> --conf spark.memory.offHeap.enabled=true   \
> --conf spark.memory.offHeap.size=30g   \
> --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager  \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer   \
> --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"   \
> --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
> --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain   \
> --jars hudi-benchmarks-0.1-SNAPSHOT.jar \
> --packages org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1 \
> --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
> --conf spark.hadoop.fs.s3a.endpoint=s3.us-west-2.amazonaws.com \
> --conf spark.hadoop.fs.s3a.use.instance.credentials=true \
> --conf spark.hadoop.fs.s3a.connection.ssl.enabled=true \
> --conf spark.hadoop.fs.s3a.path.style.access=false   \
> --conf spark.sql.catalogImplementation=in-memory \
> --conf spark.ui.proxyBase="" \
> --conf 'spark.eventLog.enabled=true' \
> --conf 'spark.eventLog.dir=hdfs:///var/log/spark/apps'
zhztheplayer commented 8 months ago

@sagarlakshmipathy @rhh777 @loukey-lj

For compression type 2,

It might because of lack of compression type LZ4 (not 100% sure about it, the message "Compression type 2" is blur)

https://github.com/facebook/folly/blob/main/folly/compression/Compression.h

Would you like to check if liblz4 was included in your compilation machine?

Though Q67 has its own known memory issue in Gluten and @JkSelf is already working on solving it. See our test for Q67 which has been failing for sometime:

https://github.com/apache/incubator-gluten/blob/58a459bf487120208a774d7959f7c7db417f490b/.github/workflows/velox_be.yml#L573-L584

FelixYBW commented 8 months ago

The root cause is that folly can't find lz4 library for some reason, but lz4 does be built in vcpkg and can be used by shuffle. You may bypass the issue using snappy/zstd by config spark.io.compression.codec

lz4 is used as default codec.

zhztheplayer commented 8 months ago

@rhh777 @loukey-lj @sagarlakshmipathy

If your issue was found with Gluten's static build, it should be fixed by https://github.com/apache/incubator-gluten/pull/5121.