apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.14k stars 415 forks source link

Hadoop file system options doesn't work #4963

Open felipepessoto opened 6 months ago

felipepessoto commented 6 months ago

Backend

VL (Velox)

Bug description

The issue happens when running this Delta unit test with Gluten/Velox:

https://github.com/delta-io/delta/blob/074ce173bb070ed60f10e98d3ef7dcc04cc2a744/core/src/test/scala/org/apache/spark/sql/delta/DeltaDataFrameHadoopOptionsSuite.scala#L56

I created a smaller repro using Parquet format:

  test("test123") {
    def myTest() = {
      val fakeFileSystemOptions = Map(
          "fs.myfake.impl" -> classOf[MyFakeFileSystem].getName,
          "fs.myfake.impl.disable.cache" -> "true"
        )

        val dir = new File("/tmp/test/")
        val path = s"myfake://${dir.getCanonicalPath}"

        spark.range(1, 10)
          .write
          .format("parquet")
          .mode("overwrite")
          .options(fakeFileSystemOptions)
          .save(path)

        spark.read.format("parquet").options(fakeFileSystemOptions).load(path).foreach(_ => {})
    }

    withSQLConf("spark.gluten.enabled" -> "false") {
      myTest()
    }

    withSQLConf("spark.gluten.enabled" -> "true") {
      //This one fails
      myTest()
    }
  }

import org.apache.hadoop.fs.RawLocalFileSystem
import java.net.URI

object MyFakeFileSystem {
  val scheme = "myfake"
  val uri = URI.create(s"$scheme:///")
}

/** A fake file system to test whether session Hadoop configuration will be picked up. */
// Can't be an inner class
class MyFakeFileSystem extends RawLocalFileSystem {
  override def getScheme: String = MyFakeFileSystem.scheme
  override def getUri: URI = MyFakeFileSystem.uri
}

Delta: 2.4 Spark 3.4

Spark version

Spark-3.3.x

Spark configurations

No response

System information

No response

Relevant logs

No response

Yohahaha commented 6 months ago

Velox use libhdfs3 to connect HDFS, find complete supported config here. https://github.com/apache/hawq/blob/master/src/backend/utils/misc/etc/hdfs-client.xml

PHILO-HE commented 5 months ago

Hi @felipepessoto, could you provide some error log? I guess myfake scheme could not be recognized by Gluten/Velox.

felipepessoto commented 5 months ago

@PHILO-HE

[info] - test123 *** FAILED *** (4 seconds, 837 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to[info] - test123 *** FAILED *** (4 seconds, 837 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 8) (42bdf483196f executor driver): io.glutenproject.exception.GlutenException: java.lang.RuntimeException: Exception: VeloxRuntimeError
[info] Error Source: RUNTIME
[info] Error Code: INVALID_STATE
[info] Reason: No registered file system matched with file path 'myfake:///tmp/test/part-00001-71b3b60b-287d-4b5f-95ef-a8c7fe12a93e-c000.snappy.parquet'
[info] Retriable: False
[info] Context: Split [Hive: myfake:///tmp/test/part-00001-71b3b60b-287d-4b5f-95ef-a8c7fe12a93e-c000.snappy.parquet 0 - 494] Task Gluten_Stage_5_TID_8
[info] Top-Level Context: Same as context.
[info] Function: getFileSystem
[info] File: /__w/1/s/Velox/velox/common/file/FileSystems.cpp
[info] Line: 61
[info] Stack trace:
[info] # 0  _ZN8facebook5velox7process10StackTraceC1Ei
[info] # 1  _ZN8facebook5velox14VeloxExceptionC1EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_
[info] # 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_17VeloxRuntimeErrorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEEEvRKNS1_18VeloxCheckFailArgsET0_
[info] # 3  _ZN8facebook5velox11filesystems13getFileSystemESt17basic_string_viewIcSt11char_traitsIcEESt10shared_ptrIKNS0_6ConfigEE
[info] # 4  _ZN8facebook5velox19FileHandleGeneratorclERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
[info] # 5  _ZN8facebook5velox13CachedFactoryINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESt10shared_ptrINS0_10FileHandleEENS0_19FileHandleGeneratorEE8generateERKS7_
[info] # 6  _ZN8facebook5velox9connector4hive11SplitReader12prepareSplitESt10shared_ptrINS0_6common14MetadataFilterEERNS0_4dwio6common17RuntimeStatisticsE
[info] # 7  _ZN8facebook5velox9connector4hive14HiveDataSource8addSplitESt10shared_ptrINS1_14ConnectorSplitEE
[info] # 8  _ZN8facebook5velox4exec9TableScan9getOutputEv
[info] # 9  _ZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEE
[info] # 10 _ZN8facebook5velox4exec6Driver4nextERSt10shared_ptrINS1_13BlockingStateEE
[info] # 11 _ZN8facebook5velox4exec4Task4nextEPN5folly10SemiFutureINS3_4UnitEEE
[info] # 12 _ZN6gluten24WholeStageResultIterator4nextEv
[info] # 13 Java_io_glutenproject_vectorized_ColumnarBatchOutIterator_nativeHasNext
[info] # 14 0x00007f04087eaa30
[info] 
[info]  at io.glutenproject.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:39)
[info]  at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
[info]  at io.glutenproject.utils.InvocationFlowProtection.hasNext(Iterators.scala:135)
[info]  at io.glutenproject.utils.IteratorCompleter.hasNext(Iterators.scala:69)
[info]  at io.glutenproject.utils.PayloadCloser.hasNext(Iterators.scala:35)
[info]  at io.glutenproject.utils.PipelineTimeAccumulator.hasNext(Iterators.scala:98)
[info]  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
[info]  at scala.collection.Iterator.isEmpty(Iterator.scala:387)
[info]  at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
[info]  at org.apache.spark.InterruptibleIterator.isEmpty(InterruptibleIterator.scala:28)
[info]  at io.glutenproject.execution.VeloxColumnarToRowExec$.toRowIterator(VeloxColumnarToRowExec.scala:116)
[info]  at io.glutenproject.execution.VeloxColumnarToRowExec.$anonfun$doExecuteInternal$1(VeloxColumnarToRowExec.scala:80)
[info]  at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:860)
[info]  at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:860)
[info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
[info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
[info]  at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
[info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
[info]  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
[info]  at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
[info]  at org.apache.spark.scheduler.Task.run(Task.scala:139)
[info]  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
[info]  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
[info]  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info]  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info]  at java.base/java.lang.Thread.run(Thread.java:829)
PHILO-HE commented 5 months ago

Hi @felipepessoto, sorry for this late reply. The given file system options are not handled by Gluten. Currently, Velox only recognizes standard schemes, eg., "hdfs://" for HDFS, "file://" for local file system. I note this issue was found in UT. Please let me know whether the support is required in your real workloads.

felipepessoto commented 5 months ago

@PHILO-HE right it is only in unit tests

felipepessoto commented 1 month ago

Possibly a duplicate of #4084