oap-project / gazelle_plugin

Native SQL Engine plugin for Spark SQL with vectorized SIMD optimizations.
Apache License 2.0
256 stars 76 forks source link

Run Q14a.sql , q14b.sql of TPC-DS will meet hanging issue with the setting "spark.sql.inMemoryColumnarStorage.batchSize=32768" #560

Open haojinIntel opened 3 years ago

haojinIntel commented 3 years ago

We use the settings about batch size like below confs:

spark.sql.inMemoryColumnarStorage.batchSize      32768
spark.sql.execution.arrow.maxRecordsPerBatch       32768
spark.sql.parquet.columnarReaderBatchSize            32768

When running q14a.sql or q14b.sql, the test will be hang on native spill:

com.intel.oap.vectorized.ExpressionEvaluatorJniWrapper.nativeSpill(Native Method)
com.intel.oap.vectorized.ExpressionEvaluator$NativeSpiller.spill(ExpressionEvaluator.java:299)
com.intel.oap.spark.sql.execution.datasources.v2.arrow.NativeSQLMemoryConsumer.spill(NativeSQLMemoryConsumer.java:38)
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:177) => holding Monitor(org.apache.spark.memory.TaskMemoryManager@733732020})
org.apache.spark.memory.MemoryConsumer.acquireMemory(MemoryConsumer.java:136)
com.intel.oap.spark.sql.execution.datasources.v2.arrow.NativeSQLMemoryConsumer.acquire(NativeSQLMemoryConsumer.java:46)
com.intel.oap.spark.sql.execution.datasources.v2.arrow.SparkManagedReservationListener.reserve(SparkManagedReservationListener.java:42) => holding Monitor(com.intel.oap.spark.sql.execution.datasources.v2.arrow.SparkManagedReservationListener@1231763344})
com.intel.oap.vectorized.ExpressionEvaluatorJniWrapper.nativeSpill(Native Method)
com.intel.oap.vectorized.ExpressionEvaluator$NativeSpiller.spill(ExpressionEvaluator.java:299)
com.intel.oap.spark.sql.execution.datasources.v2.arrow.NativeSQLMemoryConsumer.spill(NativeSQLMemoryConsumer.java:38)
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:177) => holding Monitor(org.apache.spark.memory.TaskMemoryManager@733732020})
org.apache.spark.memory.MemoryConsumer.acquireMemory(MemoryConsumer.java:136)
com.intel.oap.spark.sql.execution.datasources.v2.arrow.NativeSQLMemoryConsumer.acquire(NativeSQLMemoryConsumer.java:46)
com.intel.oap.spark.sql.execution.datasources.v2.arrow.SparkManagedReservationListener.reserve(SparkManagedReservationListener.java:42) => holding Monitor(com.intel.oap.spark.sql.execution.datasources.v2.arrow.SparkManagedReservationListener@1231763344})
com.intel.oap.vectorized.BatchIterator.nativeNext(Native Method)
com.intel.oap.vectorized.BatchIterator.next(BatchIterator.java:75)
com.intel.oap.expression.ColumnarSorter$$anon$1.next(ColumnarSorter.scala:166)
com.intel.oap.expression.ColumnarSorter$$anon$1.next(ColumnarSorter.scala:129)
com.intel.oap.vectorized.CloseableColumnBatchIterator.next(CloseableColumnBatchIterator.scala:52)
com.intel.oap.vectorized.CloseableColumnBatchIterator.next(CloseableColumnBatchIterator.scala:29)
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:32)
com.intel.oap.execution.ColumnarNativeIterator.hasNext(ColumnarNativeIterator.java:39)
com.intel.oap.vectorized.BatchIterator.nativeNext(Native Method)
com.intel.oap.vectorized.BatchIterator.next(BatchIterator.java:75)
com.intel.oap.execution.ColumnarWholeStageCodegenExec$$anon$1.next(ColumnarWholeStageCodegenExec.scala:485)
com.intel.oap.execution.ColumnarWholeStageCodegenExec$$anon$1.next(ColumnarWholeStageCodegenExec.scala:476)
com.intel.oap.vectorized.CloseableColumnBatchIterator.next(CloseableColumnBatchIterator.scala:52)
com.intel.oap.vectorized.CloseableColumnBatchIterator.next(CloseableColumnBatchIterator.scala:29)
scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:117)
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
org.apache.spark.scheduler.Task.run(Task.scala:131)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
org.apache.spark.executor.Executor$TaskRunner$$Lambda$707/498585241.apply(Unknown Source)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
zhouyuan commented 3 years ago

@haojinIntel thanks for testing, looks introduced by recursively spill, will need some time to reproduce this in my env