dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

Not able to get streaming data #18

Closed sorabh89 closed 9 years ago

sorabh89 commented 9 years ago

hi Dibbhatt,

I'm not able to figure out why am I not getting the content in my batch. I have added an SOP message in foreachRDD().call method but it seems as if this method is not at all getting executed. Following is my piece of code:

props.put("consumer.forcefromstart", "false"); props.put("consumer.fetchsizebytes", "524288"); props.put("consumer.fillfreqms", "2000");

    SparkConf _sparkConf = new SparkConf();
    _sparkConf.setAppName("KafkaReceiver");
    _sparkConf.setMaster("local[4]");
    _sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false");

            JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf,
                    new Duration(100000));
            //Specify number of Receivers you need.
            int numberOfReceivers = 4;

            JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(jsc, props, numberOfReceivers, StorageLevel.DISK_ONLY());

            unionStreams.foreachRDD(new Function2<JavaRDD<MessageAndMetadata>, Time, Void>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = -5999013346771943994L;

                public Void call(JavaRDD<MessageAndMetadata> rdd,
                        Time time) throws Exception {
                    System.out.println(" methiod call rdd.collect()");
                    rdd.collect();
                    return null;
                }
            });

            jsc.start();
            jsc.awaitTermination();

I am new to Spark so not very comfortable with it, please let me know when this call() method gets executed and do I need a spark cluster to execute this.

Great Thanks,

dibbhatt commented 9 years ago

Sorry for replying late. You still have this issue ? Have you specified the Kafka related properties correctly for Receivers to consume from Kafka cluster ? Can you share the properties you specified for the same in your driver code...

sorabh89 commented 9 years ago

Thanks for your reply,

I have resolved this issue by setting value for master as local[*]. But I am facing one more issue that is related to 'akka.version'. I know that this issue is because of 'reference.conf' file, but I am not able to find a solution for this.

And one more issue, this is actually not related to this particular project but is related to spark sql.

I am getting the following error:

Exception in thread "main" scala.MatchError: class com.test.dto.ReceivedEvent$Payload (of class java.lang.Class) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:189) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:188)

due to the following line of code: JavaSchemaRDD schemaRDD = sqlContext.applySchema(p, ReceivedEvent.class);

following is an overview of the class:

public class ReceivedEvent implements Serializable{

String event;
Long timestamp;
String eventType;
Payload payload;

Request your assistance.

Thanks,

sorabh89 commented 9 years ago

Hi dibbhatt,

following is the stack trace for the error:

Exception in thread "main" java.lang.ExceptionInInitializerError Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version' at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164) at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:206) at akka.actor.ActorSystem$Settings.(ActorSystem.scala:168) at akka.actor.ActorSystemImpl.(ActorSystem.scala:504) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1770) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:142) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1761) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:222) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) at org.apache.spark.SparkContext.(SparkContext.scala:240) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:571) at org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:74) at org.apache.spark.streaming.api.java.JavaStreamingContext.(JavaStreamingContext.scala:130)

If you find a solution for this then it will be of great help.

Thanks,

dibbhatt commented 9 years ago

Not sure if this is a issue with Spark Kafka Consumer. So closing the issue for this . Just FYI..not sure if this will help you : http://apache-spark-user-list.1001560.n3.nabble.com/Packaging-a-spark-job-using-maven-td5615.html