Stratio / spark-rabbitmq

RabbitMQ Spark Streaming receiver
Apache License 2.0
208 stars 84 forks source link

Streaming job fails due to connection shutdown #116

Open atfire opened 7 years ago

atfire commented 7 years ago

We have a spark rabbitmq streamin job which is occasionally failing due to the exception:

java.lang.Exception: An error happen while getting next delivery: clean connection shutdown; protocol method: #method<connection.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver.org$apache$spark$streaming$rabbitmq$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:96) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:73) Caused by: com.rabbitmq.client.ShutdownSignalException: clean connection shutdown; protocol method: #method<connection.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0) at com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:201) at com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:218) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$4.apply(RabbitMQInputDStream.scala:91) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$4.apply(RabbitMQInputDStream.scala:91) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver.org$apache$spark$streaming$rabbitmq$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:91) ... 1 more

I see a similar issue https://github.com/Stratio/spark-rabbitmq/issues/104 but we've checked and time on our servers is in sync.

Can we enable auto recovery for spark connections or is it already enabled? What else can be done to prevent this?

nelsou commented 7 years ago

RabbitMQ sucks with Spark Streaming.

atfire commented 7 years ago

I think connections keep closing due to inactivity. Is there a way to pass heartbeat interval to connection factory?

atfire commented 7 years ago

@nelsou can you tell which issues did you run into with rabbit + spark streaming?

nelsou commented 7 years ago

too many ...

haixuan8192 commented 6 years ago

org.apache.spark.SparkException: Error creating channel and connection: connection is already closed due to connection error; cause: java.io.EOFException at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.apply(Consumer.scala:211) at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getConsumer(RabbitMQRDD.scala:243) at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.(RabbitMQRDD.scala:166) at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD.compute(RabbitMQRDD.scala:143) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

The program runs normally, sometimes with this error, but does not affect the program run.