dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

AbstractMethodError with Spark 1.6.0 and Kafka 0.10.2 #52

Closed MLNW closed 6 years ago

MLNW commented 6 years ago

I'm trying to use this library with older versions of Spark (1.6.0-cdh5.11.1) and Kafka (0.10.2-kafka-2.2.0), but while trying to persist the offsets after the application logic happened I get the mentioned error.

It seems to me that it is a version miss match between Scala versions. For me its not easy to switch to 2.11 scala so I guess my question would be: Is there a way to make your library work with my versions?

Below is the observed exception and the important bits of my pom file:

java.lang.AbstractMethodError: consumer.kafka.PartitionOffsetPair.call(Ljava/lang/Object;)Ljava/lang/Iterable;
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/11/17 12:02:52 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.AbstractMethodError: consumer.kafka.PartitionOffsetPair.call(Ljava/lang/Object;)Ljava/lang/Iterable;
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/11/17 12:02:52 INFO storage.DiskBlockManager: Shutdown hook called
               <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <exclusion>
                <artifactId>org.apache.kafka</artifactId>
                <groupId>kafka_2.10</groupId>
            </exclusion>
            <exclusion>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
            </exclusion>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-spark</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>dibbhatt</groupId>
            <artifactId>kafka-spark-consumer</artifactId>
            <version>1.0.12</version>
        </dependency>
dibbhatt commented 6 years ago

Which version of the consumer you are running ? It is due to version of your Spark (1.6.0) and version in pom doesn't match. You can git clone the code and update the consumer pom to match your version and try. But using spark 1.6 you may see couple of compilation issue which are easy to solve.

Here are the steps you can try.

  1. git clone the latest code.

  2. modify pom.xml to match your kafka and spark version ( including scala version)

e.g.

**

1.6.0 0.10.2.0 org.apache.spark spark-core_2.10** and **org.apache.spark spark-streaming_2.10** and **org.apache.kafka kafka_2.10** 2. As Spark 1.6 and 2.0 has some incompatible changes, you need to remove one Listener call back from consumer.kafka.ReceiverStreamListener.java remove this import **import org.apache.spark.streaming.scheduler.StreamingListenerStreamingStarted;** and remove this call back **@Override public void onStreamingStarted(StreamingListenerStreamingStarted arg0) { }** 3. Spark 1.6 and 2.0 has another incompatibility for return type of PairFlatMapFunction. So you need to modify this file consumer.kafka.PartitionOffsetPair.java change return type of call method public Iterator> call(Iterator> it) to **public Iterable**> call(Iterator> it) And change the return type from return kafkaPartitionToOffsetList.iterator(); to **return kafkaPartitionToOffsetList;** That's it. Build the consumer and you should be all set to use it for Spark 1.6 and Kafka 0.10.2 Let me know if you face any issues. Dibyendu
dibbhatt commented 6 years ago

Or another option is use consumer version 1.0.9. That will work with Spark 1.6

    <dependency>
        <groupId>dibbhatt</groupId>
        <artifactId>kafka-spark-consumer</artifactId>
        **<version>1.0.9</version>**
    </dependency>
dibbhatt commented 6 years ago

Here is the V 1.0.9 READ ME https://github.com/dibbhatt/kafka-spark-consumer/tree/117f98ccf02ad4f6e5a8b8918b5db097e7d3a3d4

MLNW commented 6 years ago

Thank you for your quick response!

I used your first approach and modified the latest code to use my versions of Kafka, Spark and Scala. Seems to work.

I will do some more extensive testing during this week. If I find anything else I'll let you know.

Cheers!

dibbhatt commented 6 years ago

Perfect. Do let me know if you see any issues or need any help on tuning various knobs .

LinMingQiang commented 5 years ago

When spark job was submitted The system loaded the default jar of CDH(spark-assembly-1.6.0-cdh5.14.4-hadoop2.6.0-cdh5.14.4.jar)。The Kafka version is not 010。(0.9.0)

dibbhatt commented 5 years ago

Hi @LinMingQiang , in your Application pom, what version of jars you have specified ?

LinMingQiang commented 5 years ago

spark 1.6.0 kafka 0.10.0

dibbhatt commented 5 years ago

Whats the issue you see ? Is the streaming job not running ?