object wordstream {
def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println(s"""
Usage: DirectKafkaWordCount
is a list of one or more Kafka brokers
is a consumer group name to consume from topics
is a list of one or more kafka topics to consume from
""".stripMargin)
System.exit(1)
}
//StreamingExamples.setStreamingLogLevels()
val Array(brokers, groupId, topics) = args
// Create context with 2 second batch interval
val spark = SparkSession
.builder
.appName("wd4")
.master("local")
.getOrCreate()
val sparkConf = new SparkConf().setAppName("wordstream").setMaster("local")
// val ssc = new StreamingContext(sparkConf, Seconds(2))
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
/*val messages = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)*/
// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
Hi ,
Whenever I am writing the code in eclipse, I am getting this error . I am using scala 2.11 and the maven have below entries:
Below is my code :
package com.spark.stream
// scalastyle:off println
import org.apache.kafka import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming. import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
// val ssc = new StreamingContext(sparkConf, Seconds(2)) val ssc = new StreamingContext(sparkConf, Seconds(2))
}
}
// scalastyle:on println
Below is printout of error I am getting: