delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[BUG] #3749

Open ranjanankur314 opened 1 month ago

ranjanankur314 commented 1 month ago

Bug: Not able to run PySpark code with Delta on k8s cluster on GCP

Which Delta project/connector is this regarding?

Describe the problem

I am attempting to execute the PySpark code, which involves reading and writing to Delta on a GCS bucket. I am using the k8s cluster and running the spark-submit command.

For normal PySpark code involving Parquet files. It is a working file but for delta, I am getting the following error. I have been trying this with multiple different versions for the last week.

  File "/tmp/spark-f1f15d72-8494-4597-8d63-ccc0b33a52e4/pyspark_gcs_parquet_read.py", line 38, in <module>
    extract()
  File "/tmp/spark-f1f15d72-8494-4597-8d63-ccc0b33a52e4/pyspark_gcs_parquet_read.py", line 27, in extract
    input_df.write.mode("overwrite").format("delta").save("/tmp/delta-table2")
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1463, in save
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o226.save.
: java.lang.VerifyError: Bad type on operand stack
Exception Details:
  Location:
    org/apache/spark/sql/delta/stats/StatisticsCollection$SqlParser$$anon$1.visitMultipartIdentifierList(Lorg/apache/spark/sql/catalyst/parser/SqlBaseParser$MultipartIdentifierListContext;)Lscala/collection/Seq; @17: invokevirtual
24/10/03 06:41:32 INFO SparkContext: SparkContext is stopping with exitCode 0.
  Reason:
    Type 'org/apache/spark/sql/catalyst/parser/SqlBaseParser$MultipartIdentifierListContext' (current frame, stack[1]) is not assignable to 'org/antlr/v4/runtime/ParserRuleContext'
  Current Frame:
    bci: @17
    flags: { }
    locals: { 'org/apache/spark/sql/delta/stats/StatisticsCollection$SqlParser$$anon$1', 'org/apache/spark/sql/catalyst/parser/SqlBaseParser$MultipartIdentifierListContext' }
    stack: { 'org/apache/spark/sql/catalyst/parser/ParserUtils$', 'org/apache/spark/sql/catalyst/parser/SqlBaseParser$MultipartIdentifierListContext', 'scala/Option', 'scala/Function0' }
  Bytecode:
    0000000: b200 232b b200 23b6 0027 2a2b ba00 3f00
    0000010: 00b6 0043 c000 45b0 

I am using the following spark_submit command

  spark-submit \
  --master k8s://<IP> \
  --deploy-mode cluster \
  --name spark-ankur-4 \
  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.5,io.delta:delta-spark_2.12:3.0.0 \
  --conf spark.executor.instances=1 \
  --conf spark.kubernetes.container.image=bitnami/spark:3.5.3 \
  --conf spark.driver.userClassPathFirst=true \
  --conf spark.executor.userClassPathFirst=true \
  --conf spark.kubernetes.tolerations.0.key=<value> \
  --conf spark.kubernetes.tolerations.0.operator=Equal \
  --conf spark.kubernetes.tolerations.0.value=perf \
  --conf spark.kubernetes.tolerations.0.effect=NoSchedule \
  --conf spark.hadoop.hive.metastore.uris=thrift://<address>\
  --conf spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS \
  --conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
  --conf spark.kubernetes.driver.podTemplateFile=pod_template.yaml \
  --conf spark.kubernetes.executor.podTemplateFile=pod_template.yaml \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
  --conf spark.kubernetes.namespace=default \
  --conf spark.hadoop.hadoop.security.authentication=simple \
  --conf spark.hadoop.hadoop.security.authorization=false \
  --conf spark.executorEnv.LD_PRELOAD=/opt/bitnami/common/lib/libnss_wrapper.so \
  --conf 'spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp' \
  gs://public-bucket-ankur/pyspark_gcs_parquet_read.py

PySpark code snippet

def extract():
    # Initialize Spark session with required configurations
    spark = (
        SparkSession.builder
        .appName("spark-pi")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.11.jar")
        .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
        .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
        .getOrCreate()
    )

    print(f"Spark Version {spark.sparkContext.version}")
    print(f"{spark.sparkContext.getConf().getAll()}")

    gcs_file_path = "gs://public-bucket-ankur/sample.snappy.parquet"
    input_df = spark.read.parquet(f"{gcs_file_path}")
    input_df.printSchema()
    input_df.show(truncate=False)

    print("Normal Parquet File completed")

    input_df.write.mode("overwrite").format("delta").save("/tmp/delta-table2")

    df = spark.read.format("delta").load("/tmp/delta-table2")
    df.show()

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?