Stratio / spark-rabbitmq

RabbitMQ Spark Streaming receiver
Apache License 2.0
208 stars 84 forks source link

java.io.IOException: java.lang.ClassNotFoundException: org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver #110

Closed dataExplor closed 7 years ago

dataExplor commented 7 years ago

Hi All,

I am trying to write a simple Rabbit MQ Receiver application, in which spark streaming reads data from rabbit MQ and print as a String. Would like to know your suggestions on this error. Any help would be appreciated!

package enterprise.messaging.ism.consumer;

import java.util.*;

import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger;

import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.rabbitmq.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.SpringApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext;

import com.rabbitmq.client.QueueingConsumer.Delivery;

public final class ConsumerApplication extends SpringApplication {

private final Logger logger = LogManager.getLogger(getClass());

public ConsumerApplication(Object... sources) {
    super(sources);
}

@Override
protected void afterRefresh(ConfigurableApplicationContext context, ApplicationArguments args) {
    super.afterRefresh(context, args);
    logBeans(context);
    try {
        streamEnterpriseMessage(context);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

private void logBeans(ApplicationContext context) {
    logger.debug("Application context beans:");
    List<String> beanNames = Arrays.asList(context.getBeanDefinitionNames());
    beanNames.sort((name1, name2) -> name1.compareTo(name2));
    beanNames.stream().forEach(beanName -> logger.debug(beanName));
}

public static void streamEnterpriseMessage(ApplicationContext context) throws InterruptedException{

    try {

        SparkConf conf = new SparkConf();
        conf.setAppName("Sample Rabbit MQ Receiver");
        conf.setMaster("spark://10.205.233.177:7077");

        JavaStreamingContext streamCtx = new JavaStreamingContext(conf, new Duration(1000));

        Map<String, String>rabbitMqConParams = new HashMap<String, String>();
        rabbitMqConParams.put("host", localhost);
        rabbitMqConParams.put("queueName", testQueue);

        Function<Delivery, String> messageHandler = new Function<Delivery, String>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            public String call(Delivery message) {
                return new String(message.getBody());
            }
        };

        JavaReceiverInputDStream<String> receiverStream = RabbitMQUtils.createJavaStream(streamCtx, String.class , rabbitMqConParams, messageHandler);
        //logger.info("Data Stream", receiverStream.toString());
        receiverStream.print();

        streamCtx.start();
        streamCtx.awaitTermination();
    } catch (Exception e) {
        //logger.error("Could not log sample enterprise message", e);
        e.printStackTrace();
    }

}

}

My POM dependencies -

org.apache.spark spark-core_2.11 2.0.2
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.2</version>
    </dependency>

    <dependency>
        <groupId>com.stratio.receiver</groupId>
        <artifactId>spark-rabbitmq</artifactId>
        <version>LATEST</version>
    </dependency>
</dependencies>

I'm seeing the following error in console -

Exception: org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1283) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1919) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)

Environment -

Java - 1.8 (Update 131)

Spark - 2.0.2

Rabbit Mq - 3.6.10

TRBaldim commented 7 years ago

Is it possible to be the issue of calling --jars at spark submit call.

Try to add at your spark-submit the command --jars /path/to/your-jar.jar

dataExplor commented 7 years ago

@TRBaldim Thanks it worked! few of the jars needed on Spark-rabbitmq were not available in the classpath.