kubeflow / spark-operator

Kubernetes operator for managing the lifecycle of Apache Spark applications on Kubernetes.
Apache License 2.0
2.74k stars 1.36k forks source link

Delta Read/Write not working using this operator #1950

Closed ArunPesari2 closed 3 weeks ago

ArunPesari2 commented 5 months ago

when we are read/write delta format using this operator, we are getting this issue. I am adding the base-image for apache-spark(with some python libraries), mainApplication.py(which reads and writes into ADLS in Delta Format) and operator yaml

Using a base-image for apache-spark(with some python libraries), we could read and write into Delta. But when we use spark-operator we are getting the below issue. Let me know if you need additional info


Base-Docker Image
================
FROM apache/spark:latest

USER root
RUN mkdir -p /home/spark/.local/share/jupyter/runtime
RUN touch /home/spark/.local/share/jupyter/runtime/jupyter_cookie_secret
RUN chmod -R 777 /home/spark
RUN chmod -R 777 /opt/spark/work-dir
RUN chmod -R 777 /opt/

#COPY lib/* /opt/spark/jars/

RUN apt update && apt install -y vim
RUN apt-get update && apt-get install -y procps
RUN pip install notebook && pip install sparksql-magic
RUN pip install jupyter_scheduler
RUN pip install azure-keyvault-secrets azure-identity
RUN pip install delta-spark==3.1.0
RUN pip install delta

ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
ENV PYSPARK_PYTHON=/usr/bin/python3

COPY mainApplication.py /opt/spark
COPY jupyter-main.py /opt/spark

RUN chmod -R 777 /opt/spark/work-dir

===============
 mainApplication.py
===============
from pyspark.sql.functions import *
from delta import *
import datetime , time
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import logging

consumer_logger = logging.getLogger("Pipeline_logger")
consumer_logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
consumer_logger.addHandler(handler)
consumer_logger.info("spark Reading Process Started")
spark = SparkSession.builder.appName('x') \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config('spark.jars.packages','io.delta:delta-spark_2.12:3.1.0,org.apache.hadoop:hadoop-azure:3.3.1') \
        .getOrCreate()

schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True)
])

data = [("Rohit", 30), ("Ramesh", 25), ("Robin", 35)]

df = spark.createDataFrame(data, schema)

df.show()
## azure ##
# Azure Blob Storage credentials
storage_account_name = "xxxxx"
container_name = "xxx"
storage_account_key = "xxxxx+xxxxx+xxxx+xxxx=="

# Configure Spark session with ADLS configurations
spark.conf.set("fs.azure.account.auth.type.{}.dfs.core.windows.net".format(storage_account_name), "SharedKey")
spark.conf.set("fs.azure.account.key.{}.dfs.core.windows.net".format(storage_account_name), storage_account_key)

consumer_logger.info("started writing")
df.write.format("delta").mode('overwrite').save("abfss://dev@xxxxx.dfs.core.windows.net/arun-dir")

consumer_logger.info("done")

while True:
    df.write.format("delta").mode('overwrite').save("abfss://dev@xxxxxx.dfs.core.windows.net/arun-dir")
    consumer_logger.info("done")
    time.sleep(10)
    continue

===========
spark-operator
===========
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: arun-spark-test-aks-driver-exec
  labels:
    app: spark
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: "xxxx.xxxx.io/arun-test-delta-spark-aks:1.0.11"
  imagePullPolicy: Always
  mainApplicationFile: local:///opt/spark/mainApplication.py
  mainClass: org.apache.spark.examples.SparkPi
  sparkVersion: "3.4"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 2
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  driver:
    cores: 1
    coreLimit: "2400m"
    memory: "1024m"
    labels:
      version: 3.1.3
      app: spark
      jobid: "spark1"
    serviceAccount: tvsm-cvdp-spark
    volumeMounts:
    - name: driver-volume
      mountPath: /opt/spark/work-dir
  executor:
    cores: 1
    instances: 2
    memory: "1026m"
    labels:
      version: 3.1.3
            app: spark
      jobid: "spark1"
      testing: "label"
  volumes:
  - name: driver-volume
    persistentVolumeClaim:
      claimName: spark-data-pvc-arun2

========
error log:
=======
24/04/02 12:41:13 WARN SparkSession: Cannot use io.delta.sql.DeltaSparkSessionExtension to configure session extensions.
java.lang.ClassNotFoundException: io.delta.sql.DeltaSparkSessionExtension
    at java.base/java.net.URLClassLoader.findClass(Unknown Source)
    at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
    at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Unknown Source)
    at org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:41)
    at org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:36)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:94)
    at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$2(SparkSession.scala:1367)
    at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$2$adapted(SparkSession.scala:1365)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1365)
    at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:105)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:238)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Unknown Source)
24/04/02 12:41:14 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
24/04/02 12:41:14 INFO SharedState: Warehouse path is 'file:/opt/spark/work-dir/spark-warehouse'.
Traceback (most recent call last):
  File "/opt/spark/mainApplication.py", line 30, in <module>
    df.show()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 945, in show
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 963, in _show_string
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o78.showString.
: org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'spark_catalog': org.apache.spark.sql.delta.catalog.DeltaCatalog.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.catalogPluginClassNotFoundForCatalogError(QueryExecutionErrors.scala:1925)
    at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:70)
    at org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:67)
    at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:86)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:86)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:85)
    at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:51)
    at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:122)
    at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:93)
    at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:143)
    at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:140)
    at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:295)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:295)
    at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:275)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
    at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:152)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:148)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:144)
    at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:162)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:182)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:238)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:284)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:252)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:117)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.delta.catalog.DeltaCatalog
    at java.base/java.net.URLClassLoader.findClass(Unknown Source)
    at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
    at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
    at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
    ... 65 more

24/04/02 12:41:16 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/04/02 12:41:16 INFO SparkUI: Stopped Spark web UI at http://arun-spark-test-aks-driver-exec-3a764c8e9ed32083-driver-svc.spark-jobs.svc:4040
24/04/02 12:41:16 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
24/04/02 12:41:16 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
24/04/02 12:41:16 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
24/04/02 12:41:16 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/04/02 12:41:16 INFO MemoryStore: MemoryStore cleared
24/04/02 12:41:16 INFO BlockManager: BlockManager stopped
24/04/02 12:41:16 INFO BlockManagerMaster: BlockManagerMaster stopped
24/04/02 12:41:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/04/02 12:41:16 INFO SparkContext: Successfully stopped SparkContext
24/04/02 12:41:16 INFO ShutdownHookManager: Shutdown hook called
24/04/02 12:41:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-95ba71e1-8475-4f57-b9a1-b25f5d4de00a
24/04/02 12:41:16 INFO ShutdownHookManager: Deleting directory /var/data/spark-738d16bb-5173-4df0-9c59-9a9741bc7aab/spark-cce6eb80-4148-4179-b333-486493cd983f/pyspark-5dee90c8-d037-404e-9d72-a27147188148
24/04/02 12:41:16 INFO ShutdownHookManager: Deleting directory /var/data/spark-738d16bb-5173-4df0-9c59-9a9741bc7aab/spark-cce6eb80-4148-4179-b333-486493cd983f
github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 3 weeks ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.