vincenzobaz / spark-scala3

Apache License 2.0
89 stars 15 forks source link

Problem with implicits #13

Closed loghorse closed 1 year ago

loghorse commented 2 years ago

I'm currently using Scala 3 together with Spark as an example project for university. Thanks to your Project I was able to get Spark with Scala 3 up and running. But I'm still having problems, that I'm not able to solve.

Here is some example code:

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala3encoders.given

object SparkKafkaConsumer extends App {

  val sparkConfig = SparkSession.builder.appName("SparkKafkaConsumer").config("spark.master", "local[*]").getOrCreate()
  val sparkStreamingContext = new StreamingContext(sparkConfig.sparkContext, Seconds(1))

  sparkStreamingContext.sparkContext.setLogLevel("ERROR")

  val kafkaConnectionConfig = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "something"
  )

  val kafkaTopics = Array("twitch")

  val stream = KafkaUtils.createDirectStream[String, String](
    sparkStreamingContext,
    PreferConsistent,
    Subscribe[String, String](kafkaTopics, kafkaConnectionConfig)
  )

  val records = stream.map(record => record.value().split(".*Emote.*?,")(1).split(",.*,")(0).split(" "))

  import sparkConfig.implicits._

  var dfSchema = Array("word")
  var dataFrame = Seq.empty[String].toDF(dfSchema: _*)
  dataFrame.createOrReplaceTempView("results")

  records.foreachRDD(rdd => {
    val newRow = rdd.flatMap(word => word).toDF(dfSchema: _*)
    dataFrame = dataFrame.union(newRow)
    dataFrame.groupBy("word").count().sort(desc("count")).show()
  })

  sparkStreamingContext.start()
  sparkStreamingContext.awaitTermination()

The Code itself executes without an error but it doesn't show any output. It doesn't come from the spark-kafka streaming library, I checked that already. So I'm guessing it has something to do with code for creating the dataframes and therefore with the implicits, if I understood your README correctly.

I imported your scala3encoders library but that doesn't seem to do the trick. Do you have any advice or tipp on why it's not working?

vincenzobaz commented 2 years ago

Hello @loghorse, I must have missed the notification from your issues :disappointed:

Do you obtain any error during compilation? What about runtime error? Can you share the full project so that I can reproduce?

vincenzobaz commented 1 year ago

Closing for inactivity. Please reopen if you have additional info