MrPowers / ceja

PySpark phonetic and string matching algorithms
MIT License
35 stars 5 forks source link

java.net.SocketException: Datenübergabe unterbrochen (broken pipe) (Write failed) #9

Open Crefok opened 6 months ago

Crefok commented 6 months ago

Hey, If I try to run the code

import pyspark.sql.functions as F
data = [
    ("jellyfish", "smellyfish"),
    ("li", "lee"),
    ("luisa", "bruna"),
    (None, None)
]
df = spark.createDataFrame(data, ["word1", "word2"])
actual_df = df.withColumn("jaro_winkler_similarity", ceja.jaro_winkler_similarity(F.col("word1"), F.col("word2")))
actual_df.show()

I will get a lot of Exception from missed data Transmissions.

java.net.SocketException: Datenübergabe unterbrochen (broken pipe) (Write failed) at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.DataOutputStream.flush(DataOutputStream.java:123) at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:732) at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1(DAGScheduler.scala:1524) at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1$adapted(DAGScheduler.scala:1515) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1515) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1627) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2588) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

I am running the code on an Hadoop Cluster with Spark v3.2.0.3.2.7170.0-49 on yarn. uses Python version is 3.9.19

The code seems to run but it is a bit anying for the logs.

The Output seems like on your example.

+---------+----------+-----------------------+ | word1 | word2 |jaro_winkler_similarity| +---------+----------+-----------------------+ | jellyfish. | smellyfish| 0.8962963 | | li | lee | 0.6111111 | | luisa | bruna | 0.6 | | null | null | null | +---------+----------+-----------------------+