GoogleCloudDataproc / hadoop-connectors

Libraries and tools for interoperability between Hadoop-related open-source software and Google Cloud Platform.
Apache License 2.0
280 stars 238 forks source link

Could not read CSV files #1259

Open MhdMousaHAMAD opened 1 month ago

MhdMousaHAMAD commented 1 month ago

I am unable to read CSV files from GCS. The reading fails with the following stacktrace:

java.lang.NoClassDefFoundError: org/apache/hadoop/fs/Options$OpenFileOptions
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.openFileWithOptions(GoogleHadoopFileSystem.java:1234)
    at org.apache.hadoop.fs.FileSystem$FSDataInputStreamBuilder.build(FileSystem.java:4768)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:92)
    at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
    at org.apache.spark.sql.execution.datasources.text.TextFileFormat.$anonfun$readToUnsafeMem$1(TextFileFormat.scala:119)
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:217)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

Assuming the connector jar is available in a data/spark_dependencies directory under your python project my_app, the following code should re-produce the problem when reading the file gs://my-bucket/data.csv:

from importlib import resources

from pyspark.sql import SparkSession

spark_gcs_connector_dependency = str(
    resources
    .files("my_app")
    .joinpath('data', 'spark_dependencies', 'gcs-connector-3.0.2-shaded.jar')
)
spark_dependencies = [
    spark_gcs_connector_dependency,
]
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .appName("test")
    .config('spark.jars', ",".join(spark_dependencies))
    .config('spark.sql.session.timeZone', 'UTC')
    .config('spark.ui.enabled', 'false')
    .config('spark.hadoop.fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
    .config('spark.hadoop.google.cloud.auth.type', 'APPLICATION_DEFAULT')
    .getOrCreate()
)

spark.read.csv("gs://my-bucket/data.csv").show()

Environment:

System: macOS Sequoia Version 15.0.1 (24A348) Python: 3.11.1 PySpark: 3.5.1 Java: 17 (OpenJDK Runtime Environment Temurin-17.0.12+7 (build 17.0.12+7)). Hadoop Connector: gcs-connector-3.0.2-shaded.jar

cjac commented 1 month ago

Thank you for this report. I'll let product engineering know about it.