gettyimages / docker-spark

Docker build for Apache Spark
MIT License
676 stars 370 forks source link

Exception when connecting with kafka topic #70

Open Wassssim opened 3 years ago

Wassssim commented 3 years ago

Hello, I'm using the Spark 2.4.1 image, I made a simple scala application that pulls data from a kafka topic and displays it in the console:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._

object StreamHandler {
  def main(args: Array[String]): Unit = {

    // initialize Spark
    val spark = SparkSession
      .builder()
      .appName("KafkaStreaming")
      .getOrCreate()

    // avoid warnings
    import spark.implicits._

    // read from Kafka
    val inputDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "sometopic") 
      .option("startingOffsets", "earliest")
      .option("partition.assignment.strategy", "range")
      .load();

    val expandedDF = inputDF.selectExpr("CAST(value AS STRING)")

    val query = expandedDF
      .writeStream
      .outputMode("update")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

The problem is when I run spark-submit I get this exception:

2021-07-27 19:30:19,993 INFO streaming.MicroBatchExecution: Using MicroBatchReader [KafkaV2[Subscribe[plates-raw]]] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@61532d01]
2021-07-27 19:30:20,016 INFO streaming.MicroBatchExecution: Starting new streaming query.
2021-07-27 19:30:20,030 INFO streaming.MicroBatchExecution: Stream started from {}
2021-07-27 19:30:20,172 WARN kafka010.KafkaOffsetReader: Error in attempt 1 getting Kafka offsets:
org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value.
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)

This is the spark-submit command:

docker exec -it spark-master ./bin/spark-submit\
--packages "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0\
--class StreamHandler --master spark://master:7077 --executor-memory 2G --total-executor-cores 2\
/tmp/data/streamhandler_2.11-0.1.jar

I tried the Spark 2.4.1 image and I still got the same error, this app works normally with Spark 2.2.0 image, but I need to use Spark 2.4 or above so that I can later save the topic values in Cassandra with .foreachBatch

Any help would be greatly appreciated!

amineouba commented 2 years ago

did you solve it ?