SignifAi / Spark-PubSub

Google Cloud Pubsub connector for Spark Streaming
Apache License 2.0
16 stars 3 forks source link

Job crashes with py4j.Py4JException: Cannot obtain a new communication channel #5

Open natasha-aleksandrova opened 6 years ago

natasha-aleksandrova commented 6 years ago

I am trying to get this library and not having any luck... Appreciate any help!

here is the code:

from pyspark.streaming import StreamingContext
from signifai.pubsub import PubsubUtils
from pyspark import SparkContext

SUBSCRIPTION = "projects/bigdata220-final-project/subscriptions/out_meetup_rsvp"

sc =SparkContext()
ssc = StreamingContext(sc, 1)
pubsubStream = PubsubUtils.createStream(ssc, SUBSCRIPTION, 5, False)
pubsubStream.flatMap(lambda x: x).pprint()
ssc.start()

here are the logs from GCP:

18/03/11 05:00:38 INFO org.spark_project.jetty.util.log: Logging initialized @2825ms
18/03/11 05:00:38 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT
18/03/11 05:00:38 INFO org.spark_project.jetty.server.Server: Started @2951ms
18/03/11 05:00:39 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@5625b833{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
18/03/11 05:00:39 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.3-hadoop2
18/03/11 05:00:40 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at cluster-main-m/10.128.0.5:8032
18/03/11 05:00:43 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1519879216511_0007
18/03/11 05:00:52 WARN org.apache.spark.streaming.StreamingContext: Dynamic Allocation is enabled for this application. Enabling Dynamic allocation for Spark Streaming applications can cause data loss if Write Ahead Log is not enabled for non-replayable sources like Flume. See the programming guide for details on how to enable the Write Ahead Log.

[Stage 0:>                                                         (0 + 0) / 50]
[Stage 0:>                                                         (0 + 1) / 50]
[Stage 0:=>                                                        (1 + 1) / 50]
[Stage 0:=====>                                                    (5 + 1) / 50]
[Stage 0:===========>                                             (10 + 1) / 50]
[Stage 0:=================>                                       (15 + 1) / 50]
[Stage 0:==========================>                              (23 + 1) / 50]
[Stage 0:===================================>                     (31 + 1) / 50]
[Stage 0:============================================>            (39 + 1) / 50]
[Stage 0:=======================================================> (49 + 1) / 50]
[Stage 1:================================================>        (17 + 1) / 20]

18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Slide time = 1000 ms
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Storage level = Serialized 1x Replicated
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Checkpoint interval = null
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Remember interval = 1000 ms
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Initialized and validated io.signifai.pubsub_spark.receiver.PubsubInputDStream@395a0746
18/03/11 05:01:00 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error generating jobs for time 1520744460000 ms
py4j.Py4JException: Cannot obtain a new communication channel
    at py4j.CallbackClient.sendCommand(CallbackClient.java:340)
    at py4j.CallbackClient.sendCommand(CallbackClient.java:316)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:103)
    at com.sun.proxy.$Proxy27.call(Unknown Source)
    at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
    at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
    at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
    at scala.Option.orElse(Option.scala:289)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/03/11 05:01:00 ERROR org.apache.spark.streaming.api.python.PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Cannot obtain a new communication channel
    at py4j.CallbackClient.sendCommand(CallbackClient.java:340)
    at py4j.CallbackClient.sendCommand(CallbackClient.java:316)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:103)
    at com.sun.proxy.$Proxy27.call(Unknown Source)
    at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
    at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
    at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
    at scala.Option.orElse(Option.scala:289)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/03/11 05:01:00 ERROR org.apache.spark.streaming.api.python.PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Cannot obtain a new communication channel
    at py4j.CallbackClient.sendCommand(CallbackClient.java:340)
    at py4j.CallbackClient.sendCommand(CallbackClient.java:316)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:103)
    at com.sun.proxy.$Proxy27.call(Unknown Source)
    at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
    at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
    at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
    at scala.Option.orElse(Option.scala:289)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/03/11 05:01:00 WARN org.apache.spark.streaming.scheduler.ReceiverTracker: Not all of the receivers have deregistered, ArrayBuffer(0)
18/03/11 05:01:00 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
18/03/11 05:01:00 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
18/03/11 05:01:00 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark@5625b833{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
shulegaa commented 5 years ago

I've reproduced this using precisely the same program (save for my own subscription) on Google Cloud Platform (GCP) DataProc v1.2. I've both published hundreds of messages onto my own GCP PubSub subscription. Just to be sure, I've subscribed back to pull the self-same messages - via the Python client running on my laptop. As expected, GCP PubSub seems fine.

For me too, this 'receiver-JAR-with-pyspark-binding-egg' would be ideal. It builds clean.

So, anyone know of any alternatives?

bmahe commented 5 years ago

Thank you both @natasha-aleksandrova and @shulegaa for the detailed report!

That exception does not make the cause apparent and I will need a bit more information:

I just tried to reproduce tonight but was unable to unfortunately. However I only tried with Apache Spark 2.3.2 as standalone on my laptop and not on dataproc. But both java and python connectors worked for me.

Also note that I just pushed a commit to upgrade the versions referenced. So you may want to get these updates.

natasha-aleksandrova commented 5 years ago

@bmahe I am no longer working on the Pyspark project and won't be able to provide additional info, sorry!