dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

Kafka apache spark Streaming recive 0 #19

Closed sidahmedbenkhaoua closed 9 years ago

sidahmedbenkhaoua commented 9 years ago

i have lunched my code below

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

import com.google.common.collect.Lists;

public final class JavaKafkaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    private JavaKafkaWordCount() {
    }

    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount")
                .set("spark.master", "local[4]");
        // Create the context with a 1 second batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                new Duration(2000));

        // int numThreads = Integer.parseInt(args[3]);
        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        // String[] topics = args[2].split(",");
        // for (String topic : topics) {
        topicMap.put("page_visits", 1);
        // }

        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils
                .createStream(jssc, "127.0.0.1", "0", topicMap);

        JavaDStream<String> lines = messages
                .map(new Function<Tuple2<String, String>, String>() {
                    @Override
                    public String call(Tuple2<String, String> tuple2) {
                        return tuple2._2();
                    }
                });

        JavaDStream<String> words = lines
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public Iterable<String> call(String x) {
                        return Lists.newArrayList(SPACE.split(x));
                    }
                });

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

but no data reciver the out put is only

-------------------------------------------
Time: 1434192136000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192138000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192140000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192142000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192144000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192146000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192148000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192150000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192152000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192154000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192156000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192158000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192160000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192162000 ms
-------------------------------------------

any solution please !

dibbhatt commented 9 years ago

You are using KafkaUtils which is not for this Low Level Consumer. Please see the Readme file how to use this consumer .

sidahmedbenkhaoua commented 9 years ago

issues resolved with added kafka-client jar in class path