dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver #38

Closed vroy007 closed 7 years ago

vroy007 commented 7 years ago

Hello~ @dibbhatt I used this framework to integrate Kafka and Spark Streaming. But I have some problems. My cluster is based on Spark2.0.0 , Kafka 0.10.1.0 , ZK-3.4.9. And I fixed some syntax errors in "ProcessedOffsetManager.java" because it may be some difference between spark1.6.0 and spark2.0.0.

` package consumer.kafka;

import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties;

import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.dstream.DStream;

import scala.Tuple2; import scala.reflect.ClassTag;

import com.google.common.collect.ImmutableMap;

public class ProcessedOffsetManager {

private static void persistProcessedOffsets(Properties props, Map<Integer, Long> partitionOffsetMap) { ZkState state = new ZkState(props.getProperty(Config.ZOOKEEPER_CONSUMER_CONNECTION)); for(Map.Entry<Integer, Long> po : partitionOffsetMap.entrySet()) { Map<Object, Object> data = (Map<Object, Object>) ImmutableMap .builder() .put("consumer",ImmutableMap.of("id",props.getProperty(Config.KAFKA_CONSUMER_ID))) .put("offset", po.getValue()) .put("partition",po.getKey()) .put("broker",ImmutableMap.of("host", "", "port", "")) .put("topic", props.getProperty(Config.KAFKA_TOPIC)).build(); String path = processedPath(po.getKey(), props); try{ state.writeJSON(path, data); }catch (Exception ex) { state.close(); throw ex; } } state.close(); }

private static String processedPath(int partition, Properties props) { return props.getProperty(Config.ZOOKEEPER_CONSUMER_PATH)

// JavaPairDStream<Integer, Long> partitonOffsetStream = unionStreams.mapPartitionsToPair // (new PairFlatMapFunction<Iterator, Integer, Long>() { // @Override // public Iterable<Tuple2<Integer, Long>> call(Iterator entry) throws Exception { // MessageAndMetadata mmeta = null; // List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>(); // while(entry.hasNext()) { // mmeta = entry.next(); // } // if(mmeta != null) { // l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset())); // } // return l; // } // }); JavaPairDStream<Integer, Long> partitonOffsetStream = unionStreams.mapPartitionsToPair (new PairFlatMapFunction<Iterator, Integer, Long>() {

      @Override
      public Iterator<Tuple2<Integer, Long>> call(Iterator<MessageAndMetadata> entry) throws Exception {
        MessageAndMetadata mmeta = null;
        List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
        while(entry.hasNext()) {
          mmeta = entry.next();
        }
        if(mmeta != null) {
          l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
        }
        return l.iterator();
      }
    });
JavaPairDStream<Integer, Iterable<Long>> partitonOffset = partitonOffsetStream.groupByKey(1);
return partitonOffset;

}

@SuppressWarnings("deprecation") public static void persists(JavaPairDStream<Integer, Iterable> partitonOffset, Properties props) { //spark 2.0.0 partitonOffset.foreachRDD(new VoidFunction<JavaPairRDD<Integer, Iterable>>() { @Override public void call(JavaPairRDD<Integer, Iterable> po) throws Exception { List<Tuple2<Integer, Iterable>> poList = po.collect(); System.out.println("ProcessedOffsetManager persist list size = " + poList.size()); Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>(); for(Tuple2<Integer, Iterable> tuple : poList) { int partition = tuple._1(); Long offset = getMaximum(tuple._2()); partitionOffsetMap.put(partition, offset); } persistProcessedOffsets(props, partitionOffsetMap); }

  public <T extends Comparable<T>> T getMaximum(Iterable<T> values) {
    T max = null;
    for (T value : values) {
      if (max == null || max.compareTo(value) < 0) {
        max = value;
      }
    }
    return max;
  }
});

//spark 1.6.0

// partitonOffset.foreachRDD(new Function<JavaPairRDD<Integer,Iterable>, Void>() { // @Override // public Void call(JavaPairRDD<Integer, Iterable> po) throws Exception { // List<Tuple2<Integer, Iterable>> poList = po.collect(); // Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>(); // for(Tuple2<Integer, Iterable> tuple : poList) { // int partition = tuple._1(); // Long offset = getMaximum(tuple._2()); // partitionOffsetMap.put(partition, offset); // } // persistProcessedOffsets(props, partitionOffsetMap); // return null; // } // public <T extends Comparable> T getMaximum(Iterable values) { // T max = null; // for (T value : values) { // if (max == null || max.compareTo(value) < 0) { // max = value; // } // } // return max; // } // }); }

public static DStream<Tuple2<Integer, Iterable>> getPartitionOffset(DStream unionStreams) { ClassTag messageMetaClassTag = ScalaUtil.getClassTag(MessageAndMetadata.class); JavaDStream javaDStream = new JavaDStream(unionStreams, messageMetaClassTag); // JavaPairDStream<Integer, Long> partitonOffsetStream = javaDStream.mapPartitionsToPair // (new PairFlatMapFunction<Iterator, Integer, Long>() { // @Override // public Iterable<Tuple2<Integer, Long>> call(Iterator entry) // throws Exception { // MessageAndMetadata mmeta = null; // List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>(); // while(entry.hasNext()) { // mmeta = entry.next(); // } // if(mmeta != null) { // l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset())); // } // return l; // } // }); JavaPairDStream<Integer, Long> partitonOffsetStream = javaDStream.mapPartitionsToPair (new PairFlatMapFunction<Iterator, Integer, Long>() { @Override public Iterator<Tuple2<Integer, Long>> call(Iterator entry) throws Exception { MessageAndMetadata mmeta = null; List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>(); while(entry.hasNext()) { mmeta = entry.next(); } if(mmeta != null) { l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset())); } return l.iterator(); } }); JavaPairDStream<Integer, Iterable> partitonOffset = partitonOffsetStream.groupByKey(1); return partitonOffset.dstream(); }

@SuppressWarnings("deprecation") public static void persists(DStream<Tuple2<Integer, Iterable>> partitonOffset, Properties props) { ClassTag<Tuple2<Integer, Iterable>> tuple2ClassTag = ScalaUtil.<Integer, Iterable>getTuple2ClassTag(); JavaDStream<Tuple2<Integer, Iterable>> jpartitonOffset = new JavaDStream<Tuple2<Integer, Iterable>>(partitonOffset, tuple2ClassTag);

//spark 2.0.0
jpartitonOffset.foreachRDD(new VoidFunction<JavaRDD<Tuple2<Integer, Iterable<Long>>>>() {
  @Override
  public void call(JavaRDD<Tuple2<Integer, Iterable<Long>>> po) throws Exception {
    List<Tuple2<Integer, Iterable<Long>>> poList = po.collect();
    Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
    for(Tuple2<Integer, Iterable<Long>> tuple : poList) {
      int partition = tuple._1();
      Long offset = getMaximum(tuple._2());
      partitionOffsetMap.put(partition, offset);
    }
    persistProcessedOffsets(props, partitionOffsetMap);
  }
  public <T extends Comparable<T>> T getMaximum(Iterable<T> values) {
    T max = null;
    for (T value : values) {
      if (max == null || max.compareTo(value) < 0) {
        max = value;
      }
    }
    return max;
  }
});

//spark 1.6.0

// jpartitonOffset.foreachRDD(new Function<JavaRDD<Tuple2<Integer, Iterable>>, Void>() { // @Override // public Void call(JavaRDD<Tuple2<Integer, Iterable>> po) throws Exception { // List<Tuple2<Integer, Iterable>> poList = po.collect(); // Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>(); // for(Tuple2<Integer, Iterable> tuple : poList) { // int partition = tuple._1(); // Long offset = getMaximum(tuple._2()); // partitionOffsetMap.put(partition, offset); // } // persistProcessedOffsets(props, partitionOffsetMap); // return null; // } // public <T extends Comparable> T getMaximum(Iterable values) { // T max = null; // for (T value : values) { // if (max == null || max.compareTo(value) < 0) { // max = value; // } // } // return max; // } // }); } } `

And then I build the project as a new jar. After that I run the "SampleConsumer.java" with my settings. It works! But the spark has many failed jobs caused by "java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver". So that it receives 0 data.

Error Logs:

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.io.IOException: java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1260) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253) ... 19 more ERROR: org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 19.0 failed 4 times; aborting job ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Receiver has been stopped. Try to restart it. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 4 times, most recent failure: Lost task 0.3 in stage 19.0 (TID 141, 10.100.3.90): java.io.IOException: java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1260) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

dibbhatt commented 7 years ago

This I haven't seen earlier. Have you able to solve this or still having same issue ?

To fix the ProcessedOffsetManager issue with Spark 2.0, there is a pull request you can refer to

https://github.com/dibbhatt/kafka-spark-consumer/pull/32/files

I haven't merge this as it will break for Spark 1.6. But you can try this changes and let me know if that works for you.

I need to find a way to handle this for both Spark 2.0 and earlier version of Spark.

vroy007 commented 7 years ago

Hi~ I make the same changes referring to that pull request. But it also has the same problems that the program can't deserialize the data model. In detail, my producer program is:

` String topicName = "TEST-TOPIC"; Properties props = new Properties(); props.put("bootstrap.servers", "x.x.x.x.88:9092, x.x.x.89:9092, x.x.x.90:9092"); props.put("producer.type", "sync"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);

for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));

System.out.println("Message sent successfully"); producer.close(); `

I have test by run the ./kafka-consumer-console.sh. It can consume the messages from producer.

As I run the SampleConsumer for test, it can't resolve the messages. In SampleConsumer, I used the following code to test

' unionStreams.foreachRDD(new VoidFunction<JavaRDD>() { @Override public void call(JavaRDD rdd) throws Exception { List rddList = rdd.collect(); System.out.println(" Number of records in this batch " + rddList.size()); for (MessageAndMetadata model : rddList) System.out.println("model key = " + new String(model.getKey(), "UTF-8") + ", value = " + new String(model.getPayload(), "UTF-8")); } }); '

The result is : "Number of records in this batch 0" =====> rddList.size() = 0 I wonder if compatibility problems cause unable to deserialize the data.

Looking forward to your answer, thanks~

dibbhatt commented 7 years ago

This consumer by default consume offset from Latest kafka offset when it starts first time. For any successive run, it will consume from last processed offset. . So, if you publish say 100 messages to kafka, latest offset in Kafka is 101. When you start the consumer first time , it starts from offset 101. If your publisher is not running, and Kafka not getting any messages, Consumer wont pull anything. I guess that is happening here.

you can start the Consumer first, and then start the producer. As Consumer waiting for new messages from latest offset, any new messages got produced will be consumed.

vroy007 commented 7 years ago

Yeah, I'm sure that I start the Consumer first. As the Consumer run successfully, I start the Producer to publish ten messages per time. And I try it several times, but the rddList's size is always zero.

When I run the SampleConsumer by the InteliJ IDEA in my Spark cluster master node, the logs show the error "java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver ". The error cause spark job failed. I guess that may be the reason why the DStream can't resolve the messages.

dibbhatt commented 7 years ago

Your topic has how many partitions ? Can you share the Receiver Property you used. You can share the log of the Executor also when you do the spark-submit

vroy007 commented 7 years ago

3 partitions

the Consumer property is default. the Broker property I will share it next Monday.

the whole logs of the Executor when I do the spark-submit are showed in my first comment of this issue. If it is not clear, I will share all logs again.

thanks

vroy007 commented 7 years ago

It works!!! Thank you so much.

dibbhatt commented 7 years ago

Good. What was the issue ?

Srinivasan-pp commented 7 years ago

Hi,

I started my spark with package dibbhatt:kafka-spark-consumer:1.0.6 , and try to execute scala> import consumer.kafka.ProcessedOffsetManager; it is throwing me error: error: object ProcessedOffsetManager is not a member of package consumer.kafka

Srinivasan-pp commented 7 years ago

according to the above discussion it seems to have faced the same issue and solved in spark 2.0 version, just would like to know whether the same fix been handled for earlier versions also.

dibbhatt commented 7 years ago

You can git clone the repo and build it with Spark 1.6. You may need to do some minor changes to make it work with Spark 1.6. Its very easy to do. Do let me know if you need any help.

dibbhatt commented 6 years ago

hi @vroy007 , if you do not mind, can you tell me name of your organization also if this consumer is running in production in your organization. I need it just for my reference.