oap-project / gazelle_plugin

Native SQL Engine plugin for Spark SQL with vectorized SIMD optimizations.
Apache License 2.0
256 stars 77 forks source link

java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.intel.oap.vectorized.ArrowWritableColumnVector #845

Open Manoj-red-hat opened 2 years ago

Manoj-red-hat commented 2 years ago

Describe the bug lost task 4.0 in stage 0.0 (TID 4) (172.30.18.3 executor driver): java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.intel.oap.vectorized.ArrowWritableColumnVector at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.$anonfun$hasNext$4(ColumnarConditionProjector.scala:236) at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.$anonfun$hasNext$4$adapted(ColumnarConditionProjector.scala:235) at scala.collection.immutable.List.map(List.scala:293) at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.hasNext(ColumnarConditionProjector.scala:235) at com.intel.oap.vectorized.CloseableColumnBatchIterator.hasNext(CloseableColumnBatchIterator.scala:47) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:96) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) Caused by: java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.intel.oap.vectorized.ArrowWritableColumnVector at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.$anonfun$hasNext$4(ColumnarConditionProjector.scala:236) at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.$anonfun$hasNext$4$adapted(ColumnarConditionProjector.scala:235) at scala.collection.immutable.List.map(List.scala:293) at com.intel.oap.expression.ColumnarConditionProjector$$anon$1.hasNext(ColumnarConditionProjector.scala:235) at com.intel.oap.vectorized.CloseableColumnBatchIterator.hasNext(CloseableColumnBatchIterator.scala:47) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:96) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

To Reproduce PATH_TO_JAR=/home/legion/miniconda3/envs/oapenv/oap_jars ; ${SPARK_HOME}/bin/spark-sql --verbose --driver-memory 10G --conf spark.plugins=com.intel.oap.GazellePlugin --conf spark.driver.extraClassPath=$PATH_TO_JAR/spark-arrow-datasource-standard-1.3.1-jar-with-dependencies.jar:$PATH_TO_JAR/spark-columnar-core-1.3.1-jar-with-dependencies.jar:$PATH_TO_JAR/spark-sql-columnar-shims-common-1.3.1.jar:$PATH_TO_JAR/spark-sql-columnar-shims-spark321-1.3.1.jar --conf spark.executor.extraClassPath=$PATH_TO_JAR/spark-arrow-datasource-standard-1.3.1-jar-with-dependencies.jar:$PATH_TO_JAR/spark-columnar-core-1.3.1-jar-with-dependencies.jar:$PATH_TO_JAR/spark-sql-columnar-shims-common-1.3.1.jar:$PATH_TO_JAR/spark-sql-columnar-shims-spark321-1.3.1.jar --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager --conf spark.driver.cores=1 --conf spark.executor.instances=12 --conf spark.executor.cores=6 --conf spark.executor.memory=20G --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=20G --conf spark.task.cpus=1 --conf spark.locality.wait=0s --conf spark.sql.shuffle.partitions=72 --conf spark.executorEnv.ARROW_LIBHDFS3_DIR="$PATH_TO_LIBHDFS3_DIR/" --conf spark.executorEnv.LD_LIBRARY_PATH="$PATH_TO_LIBHDFS3_DEPENDENCIES_DIR" --jars $PATH_TO_JAR/spark-arrow-datasource-standard-1.3.1-jar-with-dependencies.jar,$PATH_TO_JAR/spark-columnar-core-1.3.1-jar-with-dependencies.jar

A simple query based on 2G on TPCH

*select sum(l_extendedprice(1-l_discount)) from parq.lineitem_2g, parq.orders_2g where l_orderkey = o_orderkey and o_orderdate >= date '1994-01-01' and o_orderdate < date '1995-01-01';**

Expected behavior Query should run and give revenue as output

Additional context Add any other context about the problem here.

Manoj-red-hat commented 2 years ago

When I disable WSCG set spark.sql.codegen.wholeStage=false;

Query works fine

Manoj-red-hat commented 2 years ago

I resolved this issue by using Arrow Data Source.

Here is my observation 1) if Arrow data source is not enabled, please disable set spark.sql.codegen.wholeStage=false; (spark-sql) in order to run gazelle

@zhouyuan could you please review my observation

PHILO-HE commented 2 years ago

@Manoj-red-hat, sorry for this late reply! Is there any reason for not using arrow data source on your side? According to the given exception message, spark columnar batch data was feed into gazelle's columnar projection. But actually, spark columnar format is not compatible with gazelle columnar format.

Manoj-red-hat commented 2 years ago

Hi @PHILO-HE ,

Ya I understand you guys are working hard on this, evident from daily commits.

Is there any reason for not using arrow data source on your side?

I have ORC database and arrow data source is only for parquet. Earlier I was on hive-llap that's why have ORC based data.

After going through the source code, I figure out this incompatibilty thus, made another set-up of tpds parquet based data and its work.

For the sake of community just want other if they face such issue than better to go for parquet else use row_to_columanr(by disabling spark.sql.codegen.wholeStage)

Just want to confirm from the community is my understanding is correct, thanks

PHILO-HE commented 2 years ago

@Manoj-red-hat, thanks for your discussion.
I think the configuration for spark.sql.codegen.wholeStage=false may just occasionally avoid this cast exception. You may need more tests to verify it. BTW, ORC format is already supported in Gazelle. Please refer to the below source code. gazelle_plugin/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/Orc_TPCDSSuite.scala