kubeflow / spark-operator

Kubernetes operator for managing the lifecycle of Apache Spark applications on Kubernetes.
Apache License 2.0
2.79k stars 1.38k forks source link

Facing issues with reading messages from Azure EventHub (Kafka) in Spark application #1331

Closed brshravan-tech closed 3 weeks ago

brshravan-tech commented 3 years ago

Hi Team,

I am trying to create a Spark Application for streaming the messages from Azure EventHub (Kafka). Here is my use case.

Read the messages from EventHub (on a specific topic) periodically (in batches), like once every 30 mins etc.

To start with, I am trying to read the messages from EventHub by creating spark stream.

Here is what my Spark Application code looks like

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVER) \
    .option("subscribe", KAFKA_TOPIC_NAME) \
    .option("kafka.ssl.ca.location", KAFKA_SSL_CA_LOCATION) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.kerberos.service.name", "kafka") \
    .option("kafka.security.protocol", KAFKA_SECURITY_SASL_SSL) \
    .option("kafka.sasl.username", "$ConnectionString") \
    .option("kafka.sasl.password", KAFKA_CONNECTION_STRING) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", OFFSET_RESET) \
    .load()
df.writeStream.outputMode("append").format("console").start().awaitTermination()

This is what I see in the logs capturing the spark application submit

+ exec /usr/bin/tini -s -- /opt/spark/bin/spark-submit --conf spark.driver.bindAddress=10.244.12.67 --deploy-mode client --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.spark:spark-avro_2.12:3.0.0,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2 --properties-file /opt/spark/conf/spark.properties --class org.apache.spark.deploy.PythonRunner local:///opt/spark/examples/src/main/python/file11.py

I can see that it was able to successfully connect to Azure EventHub and get some details like offset etc...

21/08/23 11:36:08 INFO WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1a5fbcbe is committing.
-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+

21/08/23 11:36:08 INFO WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1a5fbcbe committed.
21/08/23 11:36:08 INFO CheckpointFileManager: Writing atomically to file:/tmp/temporary-4641f5fd-d0db-491b-a749-bfed74adf33f/commits/0 using temp file file:/tmp/temporary-4641f5fd-d0db-491b-a749-bfed74adf33f/commits/.0.f1fd09ea-b000-432e-a2a1-0fa7d4938d80.tmp
21/08/23 11:36:08 INFO CheckpointFileManager: Renamed temp file file:/tmp/temporary-4641f5fd-d0db-491b-a749-bfed74adf33f/commits/.0.f1fd09ea-b000-432e-a2a1-0fa7d4938d80.tmp to file:/tmp/temporary-4641f5fd-d0db-491b-a749-bfed74adf33f/commits/0
21/08/23 11:36:08 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "e2e2643a-c6ee-4f07-a2bc-51095a159f0e",
  "runId" : "9e896d3b-238f-4c3c-b50e-27c53db8e76c",
  "name" : null,
  "timestamp" : "2021-08-23T11:36:01.248Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 2575,
    "getBatch" : 28,
    "latestOffset" : 4321,
    "queryPlanning" : 587,
    "triggerExecution" : 7648,
    "walCommit" : 38
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic1]]",
    "startOffset" : null,
    "endOffset" : {
      "topic1" : {
        "2" : 3,
        "1" : 0,
        "0" : 0
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@25cfaad3",
    "numOutputRows" : 0
  }
}
21/08/23 11:36:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-b5ee39f3-7184-4323-b661-4af898be850a-651741359-driver-0-1, groupId=spark-kafka-source-b5ee39f3-7184-4323-b661-4af898be850a-651741359-driver-0] Seeking to LATEST offset of partition topic1-1
21/08/23 11:36:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-b5ee39f3-7184-4323-b661-4af898be850a-651741359-driver-0-1, groupId=spark-kafka-source-b5ee39f3-7184-4323-b661-4af898be850a-651741359-driver-0] Seeking to LATEST offset of partition topic1-0
21/08/23 11:36:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-b5ee39f3-7184-4323-b661-4af898be850a-651741359-driver-0-1, groupId=spark-kafka-source-b5ee39f3-7184-4323-b661-4af898be850a-651741359-driver-0] Seeking to LATEST offset of partition topic1-2

But I see errors when it is trying to read the messages and I see below error messages

21/08/23 12:03:43 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.244.10.197 executor 1): java.lang.ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaBatchInputPartition

    at java.base/java.net.URLClassLoader.findClass(Unknown Source)
    at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
    at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Unknown Source)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:457)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

This is snippet from my spark application yml file capturing the jars passed in sparkConf

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: file1
  namespace: spark
spec:
  type: Python
  pythonVersion: "3"
  mainApplicationFile: local:///opt/spark/examples/src/main/python/file11.py
  sparkVersion: "3.1.1"
  sparkConf:
    spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.spark:spark-avro_2.12:3.0.0,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2"
    spark.jars.repositories: "https://packages.confluent.io/maven"

Thanks

gauravbh1 commented 3 years ago

Error suggests that the jars are not propagated to the executors. Most probably you are hitting a spark bug https://issues.apache.org/jira/browse/SPARK-35084 You can check this ticket for some suggestions https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/1132 Or Create your own image that includes the required jars and use it in the deployment yaml 1) Download jars corresponding to org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.spark:spark-avro_2.12:3.0.0,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2 in your local system. 2) Create a docker file using the base image as gcr.io/spark-operator/spark:v3.1.1-hadoop3. 3) In your dockerfile copy the jars downloaded in the first step to the new image.

Here's a sample Dockerfile that does what's describe in pt.3

FROM gcr.io/spark-operator/spark:v3.1.1-hadoop3
COPY jars $SPARK_HOME/jars
ENTRYPOINT [ "/opt/entrypoint.sh" ]

and then in a command prompt build it docker build -f <Dockerfile which you created> -t customsparkimage . And then push to your preferred docker registry and use the same image in spark app deployment yaml.

brshravan-tech commented 3 years ago

Thanks @gauravbh1

I will try above suggestion

brshravan-tech commented 3 years ago

@gauravbh1 Can you tell me how I can download above JAR packages? I am new to Spark so not sure how I can download them on my local machine.

gauravbh1 commented 3 years ago

@brshravan-tech You can search the packages in https://mvnrepository.com/ e.g. if you want to download the jar for the package org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2, search for spark-sql-kafka-0-10. Drill down in the relevant link and download the required versioned jar.

clarkjohnd commented 3 years ago

@brshravan-tech I've just "overcome" this issue by using shared volume access for drivers and executors, mounting the required jars on it and linked them in the YAML file. Short of creating a custom docker image with the libraries pre-installed as suggested, this is so far the best way I've managed to get Kafka streaming working fully, but obviously depends on your storage setup too. See below for an example:

spec:
  deps:
    jars:
      - "local:///tmp/spark_files/jars/spark-sql-kafka.jar"
      - "local:///tmp/spark_files/jars/kafka-clients.jar"
      - "local:///tmp/spark_files/jars/commons-pool2.jar"
      - "local:///tmp/spark_files/jars/spark-token-provider-kafka-0-10_2.12.jar"

Note I've downloaded these, stripped the versions of the files and placed them into my mounted volume at /tmp/spark_files/jars/. The links for these jars for 3.1.1 (3.1.2 versions are there too) are:

https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.1.1 https://mvnrepository.com/artifact/org.apache.commons/commons-pool2/2.6.2 https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/2.6.0 https://mvnrepository.com/artifact/org.apache.spark/spark-token-provider-kafka-0-10_2.12/3.1.1

Make sure both the driver and executor have the same mounted volume in the same location so they access the jars on the same paths, as such:

spec:
  ...
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    ...
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    ...
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
brshravan-tech commented 3 years ago

@clarkjohnd Thanks a lot for the reply and suggestion. I will try it out. Would you be able to help me with some sample script for kafka streaming as well? As mentioned in first comment, below is what i have currently

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVER) \
    .option("subscribe", KAFKA_TOPIC_NAME) \
    .option("kafka.ssl.ca.location", KAFKA_SSL_CA_LOCATION) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.kerberos.service.name", "kafka") \
    .option("kafka.security.protocol", KAFKA_SECURITY_SASL_SSL) \
    .option("kafka.sasl.username", "$ConnectionString") \
    .option("kafka.sasl.password", KAFKA_CONNECTION_STRING) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", OFFSET_RESET) \
    .load()
df.writeStream.outputMode("append").format("console").start().awaitTermination()

What I need is basically to read all the messages from kafka stream once every 30 mins and process them and dump them into some database. So what changes I would need to do to achieve this?

Thanks a lot in advance...

gauravbh1 commented 3 years ago

@brshravan-tech You should look into triggers. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()
brshravan-tech commented 3 years ago

Thanks @gauravbh1 for sharing the link. I will take a look at the triggers.

clarkjohnd commented 3 years ago

@brshravan-tech sorry I have been away. Depending on your architecture using triggers and processing time may not be optimal/efficient for longer periods like 30 minutes with Spark resources sat idle waiting?

Could you perhaps:

brshravan-tech commented 3 years ago

@clarkjohnd Thanks for the reply. This time, I was away... sorry for not replying earlier.

I agree that running the job continuously with keeping it idle for long time will be resource consuming and adds unnecessary cost. I can schedule a Argo workflow to run it and read the existing messages and then exit and launch again after 30 mins (scheduling periodic cron workflow) Now, to do this, what changes do I need to do in my above code? It seems "awaitTermination" is resulting in running the spark application infinitely. Do I need to remove this?

Also, i am planning to apply "Cube" aggregation on the received messages (https://mungingdata.com/apache-spark/aggregations/). What additional changes I need to do to convert the messages into dataframes and apply these "Cube" aggregations.

Below is my current code

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVER) \
    .option("subscribe", KAFKA_TOPIC_NAME) \
    .option("kafka.ssl.ca.location", KAFKA_SSL_CA_LOCATION) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.kerberos.service.name", "kafka") \
    .option("kafka.security.protocol", KAFKA_SECURITY_SASL_SSL) \
    .option("kafka.sasl.username", "$ConnectionString") \
    .option("kafka.sasl.password", KAFKA_CONNECTION_STRING) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", OFFSET_RESET) \
    .load()
df.writeStream.outputMode("append").format("console").start().awaitTermination()

CC @gauravbh1

Thanks

brshravan-tech commented 2 years ago

@clarkjohnd and @gauravbh1, any suggestion on above queries?

gauravbh1 commented 2 years ago

@brshravan-tech readStream and writeStream are used for spark structured streaming queries. For batch queries use read in place of readStream and write in place of writeStream. When you do that get rid of the awaitTermination too. Infact when you use read and write methods for batch queries, your code will show you an error. Refer to this guide for difference between streaming mode and batch mode when using spark with Kafka https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

brshravan-tech commented 2 years ago

@gauravbh1 Thanks for the reply. I am still struggling to get some sample code running. Would you be able to help me with suggesting changes needed in my above code for it to work in batch queries with kafka

gauravbh1 commented 2 years ago

For a batch query, you would simply read from a Kafka source like this

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVER) \
    .option("subscribe", KAFKA_TOPIC_NAME) \
    .option("kafka.ssl.ca.location", KAFKA_SSL_CA_LOCATION) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.kerberos.service.name", "kafka") \
    .option("kafka.security.protocol", KAFKA_SECURITY_SASL_SSL) \
    .option("kafka.sasl.username", "$ConnectionString") \
    .option("kafka.sasl.password", KAFKA_CONNECTION_STRING) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", OFFSET_RESET) \
    .load()

To write back your output to let's say Kafka, you would write something like this

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

Notice that in the code, there's read method instead of readStream and write method instead of writeStream. Remove the awaitTermination call from your code and you would be good to go. Refer to the guide I posted in my earlier comment. It has sample code examples of both streaming and batch queries.

brshravan-tech commented 2 years ago

Hi @gauravbh1 ,

Here is what I would want to do...

I am trying to see how I can read and convert the messages to dataframes rather than dumping it on some other kafka topic. Would you be able to help with this?

github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 3 weeks ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.