qubole / kinesis-sql

Kinesis Connector for Structured Streaming
http://www.qubole.com
Apache License 2.0
137 stars 80 forks source link

pyspark foreach/foreachPartition send http request failed #82

Closed tmylt closed 4 years ago

tmylt commented 4 years ago

I use urllib.request to send http request in foreach/foreachPartition. pyspark throw error as follow:

objc[74094]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
20/07/20 19:05:58 ERROR Executor: Exception in task 7.0 in stage 0.0 (TID 7)
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:536)
            at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:525)
            at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
            at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:643)
            at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
            at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
            at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
            at scala.collection.Iterator.foreach(Iterator.scala:941)
            at scala.collection.Iterator.foreach$(Iterator.scala:941)
            at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
            at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
            at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
            at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
            at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
            at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
            at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
            at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
            at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
            at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
            at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
            at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
            at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
            at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
            at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
            at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2133)

_

when i call rdd.foreach(send_http), rdd=sc.parallelize(["http://192.168.1.1:5000/index.html"]), send_http defined as follow:

def send_http(url):
    req = urllib.request.Request(url)
    resp = urllib.request.urlopen(req)

anyone can tell me the problem? thanks.