BenFradet / spark-kafka-writer

Write your Spark data to Kafka seamlessly
https://benfradet.github.io/spark-kafka-writer
Apache License 2.0
176 stars 65 forks source link

Exception:Topic cannot be null only in cluster mode #35

Closed shukla2009 closed 7 years ago

shukla2009 commented 7 years ago

I am trying this library and found one issue with this as its not able to find the output topic in cluster mode

private def createJob(ssc: StreamingContext, kafkaParams: Map[String, String], intopics: Set[String],out:String) = { val stream = KafkaUtils .createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, intopics) import spark.implicits._

`stream.
  foreachRDD(rdd => {
    if (!rdd.isEmpty()) {
      val rawDF = rdd.map(row => Sales(row._2)).toDF()
      val joinedRdd = rawDF.join(staticTables.get(tablesToJoin.head.name).get, tablesToJoin.head.joinColumn)
        .join(staticTables.get(tablesToJoin(1).name).get, tablesToJoin(1).joinColumn)
        .select(job.outColunms.head, job.outColunms.tail: _*).toJSON.rdd
      joinedRdd.writeToKafka(producerConfig, s => new ProducerRecord[String, String](out, s))
    }
  })

}`

It works well in standalone mode

BenFradet commented 7 years ago

Where does out come from?

shukla2009 commented 7 years ago

Out is coming from function param.. Let me paste full code

`private val outTopicName = "out" private val intopicName = "in"

def main(args: Array[String]) { val ssc = StreamingContext.getOrCreate(checkpointDir, () => createSSC(checkpointDir, intopicName,outTopicName)) ssc.start ssc.awaitTermination() } val spark = SparkSession .builder() .master(master) .config("spark.cassandra.connection.host", cassandraHost) .config("spark.cassandra.connection.port", cassandraPort) .getOrCreate()

val producerConfig = { val props = new Properties() props.put("bootstrap.servers", config.getString("kafka.host")) props.put("key.serializer", classOf[StringSerializer].getName) props.put("value.serializer", classOf[StringSerializer].getName) props.put("request.required.acks", "1") props }

val staticTables = tablesToJoin.map { table => { val rawData = spark.read.format("org.apache.spark.sql.cassandra") .options(Map("table" -> table.name, "keyspace" -> keyspace)) .load() val columns = table.columns val dataframe = rawData.select(columns.head, columns.tail: _*) .persist(StorageLevel.MEMORY_AND_DISK_2) (table.name, dataframe) } }.toMap

private def createSSC(checkpointDir: String, intopicName: String, out:String) = { val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration)) ssc.checkpoint(checkpointDir) createJob(ssc, kafkaParams, Set(intopicName),out) ssc }

private def createJob(ssc: StreamingContext, kafkaParams: Map[String, String], intopics: Set[String],out:String) = { val stream = KafkaUtils .createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, intopics) import spark.implicits._

stream.
  foreachRDD(rdd => {
    if (!rdd.isEmpty()) {
      val rawDF = rdd.map(row => Sales(row._2)).toDF()
      val joinedRdd = rawDF.join(staticTables.get(tablesToJoin.head.name).get, tablesToJoin.head.joinColumn)
        .join(staticTables.get(tablesToJoin(1).name).get, tablesToJoin(1).joinColumn)
        .select(job.outColunms.head, job.outColunms.tail: _*).toJSON.rdd
      joinedRdd.writeToKafka(producerConfig, s => new ProducerRecord[String, String](out, s))
    }
  })

}`

shukla2009 commented 7 years ago

if I hard code the topic name like joinedRdd.writeToKafka(producerConfig, s => new ProducerRecord[String, String]("outTopic", s)) It starts working.

Even if I do println a line before println("OUTPUTTOPIC =>"+out) it prints the topic name

my master url looks like mesos://zk://xx.xx.xx.xx/mesos spark version 2.0.2 kakfa 0.8.2.1 .

BenFradet commented 7 years ago

I know I had a few issues when inheriting the topic name from a trait maybe it's related. It was because it didn't get shipped off with the lambda but I never found out why unfortunately.

I would advise you to make a local copy somewhere either in the function or in the foreach.

BenFradet commented 7 years ago

Closing