Stratio / spark-rabbitmq

RabbitMQ Spark Streaming receiver
Apache License 2.0
208 stars 84 forks source link

Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver #127

Closed wssnail closed 5 years ago

wssnail commented 5 years ago

`

A simple example

val conf = new SparkConf().setMaster("local[3]").setAppName("SparkStreamingRabbiMQ")
val ssc = new StreamingContext(conf, Seconds(3))
val receiverStream = RabbitMQUtils.createStream(ssc, Map(
  "host" -> "192.101.11.166",
  "queueName" -> "test",
  "vHost" -> "/",
  "username" -> "mqadmin",
  "password" -> "mqadmin"
))
receiverStream.start()
receiverStream.foreachRDD(r => println(r.count()))
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

` java.io.IOException: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1310) at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:472) at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:453) at scala.Option.map(Option.scala:146) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:453) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:295) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:290) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:375) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:373) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:373) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:370) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:370) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:85) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:67) at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:45) at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:380) at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:50) at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:364) at com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:74) at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:490) at com.esotericsoftware.kryo.Kryo.getSerializer(Kryo.java:505) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:363) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) at com.twitter.chill.WrappedArraySerializer.write(WrappedArraySerializer.scala:29) at com.twitter.chill.WrappedArraySerializer.write(WrappedArraySerializer.scala:23) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:209) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(ParallelCollectionRDD.scala:65) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(ParallelCollectionRDD.scala:65) at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:185) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) ... 40 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:54) ... 61 more Caused by: java.lang.TypeNotPresentException: Type com.rabbitmq.client.QueueingConsumer$Delivery not present at sun.reflect.generics.factory.CoreReflectionFactory.makeNamedType(CoreReflectionFactory.java:117) at sun.reflect.generics.visitor.Reifier.visitClassTypeSignature(Reifier.java:125) at sun.reflect.generics.tree.ClassTypeSignature.accept(ClassTypeSignature.java:49) at sun.reflect.generics.visitor.Reifier.reifyTypeArguments(Reifier.java:68) at sun.reflect.generics.visitor.Reifier.visitClassTypeSignature(Reifier.java:138) at sun.reflect.generics.tree.ClassTypeSignature.accept(ClassTypeSignature.java:49) at sun.reflect.generics.repository.FieldRepository.getGenericType(FieldRepository.java:85) at java.lang.reflect.Field.getGenericType(Field.java:247) at com.esotericsoftware.kryo.serializers.FieldSerializer.newCachedField(FieldSerializer.java:360) at com.esotericsoftware.kryo.serializers.FieldSerializer.createCachedFields(FieldSerializer.java:331) at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:261) at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:182) at com.esotericsoftware.kryo.serializers.FieldSerializer.(FieldSerializer.java:155) ... 65 more Caused by: java.lang.ClassNotFoundException: com.rabbitmq.client.QueueingConsumer$Delivery at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at sun.reflect.generics.factory.CoreReflectionFactory.makeNamedType(CoreReflectionFactory.java:114) ... 77 more 18/11/16 16:52:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 812.0, whose tasks have all completed, from pool 18/11/16 16:52:43 ERROR scheduler.TaskSchedulerImpl: Resource offer failed, task set TaskSet_812.0 was not serializable

wssnail commented 5 years ago

spark:2.2 rabbitmq:3.6.10 scala:2.11.8