apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.22k stars 439 forks source link

Missing method when running TPC-H in Dataproc (Ubuntu 20.04.6 + Spark 3.3.1 + Gluten 1.0) #3224

Open obedmr opened 1 year ago

obedmr commented 1 year ago

Backend

VL (Velox)

Bug description

We’re getting into a weird situation with a method (org.apache.spark.shuffle.IndexShuffleBlockResolver.writeMetadataFileAndCommit) that appears to be missing when we run the TPC-H in Dataproc, but when we go to the code, it’s actually implemented and it seems like the method appears in the *.class from the gluten jar (but maybe it’s a broken link)

I'm attaching the log file spark_gluten_issue.txt

We're running from the Jupyter Notebooks environment in Dataproc

Spark version

Spark-3.3.x

Spark configurations

me | Value -- | -- spark.app.id | local-1694627358293 spark.app.initial.file.urls | file:/home/obed_n_munoz/notebooks/custom-jars/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar,file:/home/obed_n_munoz/notebooks/custom-jars/gluten-velox-bundle-spark3.3_2.12-ubuntu_20.04-1.0.0-24.jar spark.app.initial.jar.urls | spark://onmunoz-23861-m.c.articulate-rain-321323.internal:35825/jars/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar,spark://onmunoz-23861-m.c.articulate-rain-321323.internal:35825/jars/gluten-velox-bundle-spark3.3_2.12-ubuntu_20.04-1.0.0-24.jar spark.app.name | gluten tpch_power 632391 spark.app.startTime | 1694627355770 spark.app.submitTime | 1694627355624 spark.checkpoint.compress | true spark.cleaner.periodicGC.interval | 10s spark.dataproc.listeners | com.google.cloud.spark.performance.DataprocMetricsListener spark.dataproc.metrics.listener.metrics.collector.hostname | onmunoz-23861-m spark.dataproc.sql.joinConditionReorder.enabled | true spark.dataproc.sql.local.rank.pushdown.enabled | true spark.dataproc.sql.optimizer.leftsemijoin.conversion.enabled | true spark.dataproc.sql.parquet.enableFooterCache | true spark.driver.extraClassPath | /root/jars/632391897a54e15c82e01eb1db83b7aec3763447-spark33/gluten-velox-bundle-spark3.3_2.12-ubuntu_20.04-1.0.0-24.jar:/root/jars/632391897a54e15c82e01eb1db83b7aec3763447-spark33/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar spark.driver.extraJavaOptions | -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED spark.driver.host | onmunoz-23861-m.c.articulate-rain-321323.internal spark.driver.maxResultSize | 4g spark.driver.memory | 5120m spark.driver.port | 35825 spark.dynamicAllocation.enabled | true spark.dynamicAllocation.maxExecutors | 10000 spark.dynamicAllocation.minExecutors | 1 spark.eventLog.dir | gs://dataproc-temp-us-central1-271925231168-fp6h2pdc/141cb39b-371f-4706-8344-d30652941922/spark-job-history spark.eventLog.enabled | true spark.executor.cores | 4 spark.executor.extraClassPath | /root/jars/632391897a54e15c82e01eb1db83b7aec3763447-spark33/gluten-velox-bundle-spark3.3_2.12-ubuntu_20.04-1.0.0-24.jar:/root/jars/632391897a54e15c82e01eb1db83b7aec3763447-spark33/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar spark.executor.extraJavaOptions | -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -XX:+UseParallelOldGC -XX:ParallelGCThreads=2 -XX:NewRatio=1 -XX:SurvivorRatio=1 -XX:+UseCompressedOops -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:ErrorFile=/var/log/java/hs_err_pid%p.log spark.executor.id | driver spark.executor.instances | 2 spark.executor.memory | 8192m spark.executor.memoryOverhead | 1024m spark.executorEnv.LD_PRELOAD | /usr/lib/x86_64-linux-gnu/libjemalloc.so.2 spark.executorEnv.OPENBLAS_NUM_THREADS | 1 spark.executorEnv.PYTHONPATH | /usr/lib/spark/python:/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip spark.files | /home/obed_n_munoz/notebooks/custom-jars/gluten-velox-bundle-spark3.3_2.12-ubuntu_20.04-1.0.0-24.jar,/home/obed_n_munoz/notebooks/custom-jars/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar spark.gluten.memory.offHeap.size.in.bytes | 1788870656 spark.gluten.memory.task.offHeap.size.in.bytes | 1788870656 spark.gluten.sql.columnar.backend.lib | velox spark.gluten.sql.columnar.coalesce.batches | true spark.gluten.sql.columnar.forceshuffledhashjoin | True spark.gluten.sql.columnar.maxBatchSize | 4096 spark.gluten.sql.columnar.shuffle.codec | zstd spark.hadoop.hive.execution.engine | mr spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version | 2 spark.hadoop.mapreduce.fileoutputcommitter.concurrent.write.enabled | false spark.history.fs.logDirectory | gs://dataproc-temp-us-central1-271925231168-fp6h2pdc/141cb39b-371f-4706-8344-d30652941922/spark-job-history spark.jars | /home/obed_n_munoz/notebooks/custom-jars/gluten-velox-bundle-spark3.3_2.12-ubuntu_20.04-1.0.0-24.jar,/home/obed_n_munoz/notebooks/custom-jars/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar spark.kryo.unsafe | False spark.kryoserializer.buffer.max | 512m spark.master | local spark.memory.offHeap.enabled | True spark.memory.offHeap.size | 1706m spark.metrics.namespace | app_name:${spark.app.name}.app_id:${spark.app.id} spark.nativesql.commit | ['632391897a54e15c82e01eb1db83b7aec3763447'] spark.plugins | io.glutenproject.GlutenPlugin spark.rdd.compress | True spark.repl.class.outputDir | /tmp/tmpyg34v5z3 spark.repl.class.uri | spark://onmunoz-23861-m.c.articulate-rain-321323.internal:35825/classes spark.repl.local.jars | file:///home/obed_n_munoz/notebooks/custom-jars/gluten-velox-bundle-spark3.3_2.12-ubuntu_20.04-1.0.0-24.jar,file:///home/obed_n_munoz/notebooks/custom-jars/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar spark.rpc.message.maxSize | 512 spark.scheduler.minRegisteredResourcesRatio | 0.0 spark.scheduler.mode | FAIR spark.serializer | org.apache.spark.serializer.KryoSerializer spark.serializer.objectStreamReset | 100 spark.shuffle.manager | org.apache.spark.shuffle.sort.ColumnarShuffleManager spark.shuffle.service.enabled | true spark.sql.adaptive.enabled | True spark.sql.autoBroadcastJoinThreshold | 10m spark.sql.broadcastTimeout | 4800 spark.sql.catalogImplementation | hive spark.sql.cbo.enabled | true spark.sql.cbo.joinReorder.enabled | true spark.sql.execution.removeRedundantSorts | false spark.sql.extensions | io.glutenproject.GlutenSessionExtensions spark.sql.files.maxPartitionBytes | 2g spark.sql.inMemoryColumnarStorage.enableVectorizedReader | false spark.sql.optimizer.dynamicPartitionPruning.enabled | True spark.sql.orc.enableVectorizedReader | false spark.sql.parquet.enableNestedColumnVectorizedReader | true spark.sql.parquet.enableVectorizedReader | false spark.sql.shuffle.partitions | 16 spark.submit.deployMode | client spark.submit.pyFiles |   spark.task.cpus | 1 spark.ui.port | 0 spark.ui.showConsoleProgress | true spark.yarn.am.memory | 640m spark.yarn.historyServer.address | onmunoz-23861-m:18080 spark.yarn.jars | /home/obed_n_munoz/notebooks/custom-jars/gluten-velox-bundle-spark3.3_2.12-ubuntu_20.04-1.0.0-24.jar,/home/obed_n_munoz/notebooks/custom-jars/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar spark.yarn.unmanagedAM.enabled | true


System information

Velox System Info v0.0.2
Commit: 632391897a54e15c82e01eb1db83b7aec3763447
CMake Version: 3.16.3
System: Linux-5.15.0-1041-gcp
Arch: x86_64
C++ Compiler: /usr/bin/c++
C++ Compiler Version: 9.4.0
C Compiler: /usr/bin/cc
C Compiler Version: 9.4.0
CMake Prefix Path: /usr/local;/usr;/;/usr;/usr/local;/usr/X11R6;/usr/pkg;/opt

Relevant logs

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/tmp/ipykernel_31448/3987293475.py in <cell line: 2>()
      1 test_tpc.load_all_tables_as_tempview()
      2 for l in test_tpc.query_ids:
----> 3     x=test_tpc.run_query(l, explain = False)

/tmp/ipykernel_31448/218251637.py in run_query(self, query, explain)
     67         for q in queries:
     68             if explain: q.explain()
---> 69             out=q.collect()
     70         end_time = timeit.default_timer()
     71         duration = end_time - start_time

/usr/lib/spark/python/pyspark/sql/dataframe.py in collect(self)
    815         """
    816         with SCCallSiteSync(self._sc):
--> 817             sock_info = self._jdf.collectToPython()
    818         return list(_load_from_socket(sock_info, BatchedSerializer(CPickleSerializer())))
    819 

/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323 

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    188     def deco(*a: Any, **kw: Any) -> Any:
    189         try:
--> 190             return f(*a, **kw)
    191         except Py4JJavaError as e:
    192             converted = convert_exception(e.java_exception)

/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o167.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 8) (onmunoz-23861-m.c.articulate-rain-321323.internal executor driver): java.lang.NoSuchMethodError: 'void org.apache.spark.shuffle.IndexShuffleBlockResolver.writeMetadataFileAndCommit(int, long, long[], long[], java.io.File)'
        at org.apache.spark.shuffle.ColumnarShuffleWriter.internalWrite(ColumnarShuffleWriter.scala:188)
        at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:204)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2673)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2609)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2608)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2608)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2861)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2803)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2792)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.NoSuchMethodError: 'void org.apache.spark.shuffle.IndexShuffleBlockResolver.writeMetadataFileAndCommit(int, long, long[], long[], java.io.File)'
        at org.apache.spark.shuffle.ColumnarShuffleWriter.internalWrite(ColumnarShuffleWriter.scala:188)
        at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:204)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
meharanjan318 commented 1 year ago

Hey Obed, this is a known issue in Dataproc integration with Gluten, the fix is already in progress in Dataproc and will be released soon. Will keep you updated.

obedmr commented 1 year ago

Thank you for the info Meha (@meharanjan318)! Would you happen to have a prospective release date where this would be fixed?

surnaik commented 9 months ago

@obedmr This is fixed now. Thanks for reporting! Please give it a try. cc: @meharanjan318

noamzz commented 4 months ago

Hi @surnaik, @meharanjan318 We are getting the same error, using DP 2.1 with gluten-velox-bundle-spark3.3_2.12-1.1.1.jar.

surnaik commented 4 months ago

Hi @noamzz , Thanks for reporting, could you please tell me the full image version used. Will help me verify if the fix has gone through. Also, please paste the exception, just checking if it's the same issue or slightly different.

noamzz commented 4 months ago

Thanks @surnaik ! We are using DP version 2-1-35-debian11


org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 10 times, most recent failure: Lost task 5.9 in stage 0.0 (TID 876): java.lang.NoSuchMethodError: 'void org.apache.spark.shuffle.IndexShuffleBlockResolver.writeMetadataFileAndCommit(int, long, long[], long[], java.io.File)'
    at org.apache.spark.shuffle.ColumnarShuffleWriter.internalWrite(ColumnarShuffleWriter.scala:219)
    at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:235)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1505)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2717)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2653)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2652)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2652)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1189)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1189)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1189)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2913)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2855)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2844)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.NoSuchMethodError: 'void org.apache.spark.shuffle.IndexShuffleBlockResolver.writeMetadataFileAndCommit(int, long, long[], long[], java.io.File)'
  at org.apache.spark.shuffle.ColumnarShuffleWriter.internalWrite(ColumnarShuffleWriter.scala:219)
  at org.apache.spark.shuffle.ColumnarShuffleWriter.write(ColumnarShuffleWriter.scala:235)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
  at org.apache.spark.scheduler.Task.run(Task.scala:136)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1505)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:829)```
surnaik commented 4 months ago

I checked internally and the image version used doesn't contain the fix, the fix went into 2.1.36 onwards. Could you please recreate a new cluster with the latest released image and give it a try.

noamzz commented 4 months ago

Thanks @surnaik This issue is resolved, but we face some others :)