kubeflow / spark-operator

Kubernetes operator for managing the lifecycle of Apache Spark applications on Kubernetes.
Apache License 2.0
2.8k stars 1.38k forks source link

[BUG] kubeflow spark-operator - error in querying strimzi kafka using structured streaming #2213

Closed karanalang closed 1 month ago

karanalang commented 1 month ago

Description

I've kubeflow spark-operator installed on K8s (GKE), and i'm running a structured streaming job which reads data from kafka .. the job is run every 10 mins. kubeflow version : 1.1.27, SparkApplication 3.1.3

It is giving an error shown below:

Traceback (most recent call last):
  File "/opt/spark/custom-dir/main.py", line 356, in <module>
    sys.exit(main())
  File "/opt/spark/custom-dir/main.py", line 352, in main
    ss.readData()
  File "/opt/spark/custom-dir/main.py", line 327, in readData
    query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", "topic").writeStream \
  File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/streaming.py", line 1491, in start
    return self._sq(self._jwrite.start())
  File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.9/dist-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o103.start.
: com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
POST https://storage.googleapis.com/storage/v1/b?project=versa-kafka-poc
{
  "code" : 403,
  "errors" : [ {
    "domain" : "global",
    "message" : "spark-gcs-access@versa-kafka-poc.iam.gserviceaccount.com does not have storage.buckets.create access to the Google Cloud project. Permission 'storage.buckets.create' denied on resource (or it may not exist).",
    "reason" : "forbidden"
  } ],
  "message" : "spark-gcs-access@versa-kafka-poc.iam.gserviceaccount.com does not have storage.buckets.create access to the Google Cloud project. Permission 'storage.buckets.create' denied on resource (or it may not exist)."
}

Code reading data from kafka on GKE :

def readData(self):

        logger.info(" in readData, stream ")
        logger.info(f" self.ssl_keystore_location : { self.ssl_keystore_location}, self.ssl_keystore_password : {self.ssl_keystore_password} self.kafkaBrokers :: {self.kafkaBrokers}" )
        logger.info(f" self.ssl_truststore_location:: {self.ssl_truststore_location}, self.ssl_truststore_password :: {self.ssl_truststore_password}")

        logger.info(" ... READING FROM syslog.* ...")

        df_stream = self.spark.readStream.format('kafka') \
            .option("kafka.security.protocol", "SSL") \
            .option("kafka.ssl.truststore.location", self.ssl_truststore_location) \
            .option("kafka.ssl.truststore.password", self.ssl_truststore_password) \
            .option("kafka.ssl.keystore.location", self.ssl_keystore_location) \
            .option("kafka.ssl.keystore.password", self.ssl_keystore_password) \
            .option("kafka.bootstrap.servers", self.kafkaBrokers) \
            .option("subscribePattern", "topic") \
            .option("startingOffsets", "latest") \
            .option("failOnDataLoss", "false") \
            .option("kafka.metadata.max.age.ms", "1000") \
            .option("kafka.ssl.keystore.type", "PKCS12") \
            .option("kafka.ssl.truststore.type", "PKCS12") \
            .load()

        logger.info(f" df_stream, calling convertToDictForEachBatch -> {df_stream}")
        # trigger once

        query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", "topic").writeStream \
            .outputMode("append") \
            .trigger(processingTime='10 minutes') \
            .option("truncate", "false") \
            .option("checkpointLocation", self.checkpoint) \
            .foreachBatch(self.convertToDictForEachBatch) \
            .start()

I'm unable to understand why error states - bucket.storage.create privilege not there when the code is actually reading data from kafka

any ideas on how to debug/fix ? Anyone used the kubeflow spark-operator (v3.1.1) for streaming jobs on kubernetes ?

tia!

Environment & Versions

karanalang commented 1 month ago

this seems to be the cause of this -> https://github.com/kubeflow/spark-operator/issues/1619 .. the secret is not getting mounted die to this error -> MountVolume.SetUp failed for volume “spark-conf-volume-driver” .. any updates on how to fix this ?

karanalang commented 1 month ago

the issue was with the permissions (added roles/storage.admin) to the service account, this is fixed now .. closing the issue

jacobsalway commented 1 month ago

/close

Thanks for following up with what the error was.

google-oss-prow[bot] commented 1 month ago

@jacobsalway: Closing this issue.

In response to [this](https://github.com/kubeflow/spark-operator/issues/2213#issuecomment-2395735614): >/close > >Thanks for following up with what the error was. Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes/test-infra](https://github.com/kubernetes/test-infra/issues/new?title=Prow%20issue:) repository.