MrPowers / ceja

PySpark phonetic and string matching algorithms
MIT License
35 stars 5 forks source link

Error while using Stemming #2

Open evangeliazve opened 3 years ago

evangeliazve commented 3 years ago

Hello,

I am facing issues when trying to apply stemming on text data in AWS with Pyspark. Here is the error message I'm getting: PythonException: An exception was thrown from a UDF: 'pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):

How can I resolve this?

Thank you for your support.

Best, Evangelia

MrPowers commented 3 years ago

@evangeliazve - thanks for reporting this. I'm not sure how to fix the issue. Can you please send me your exact code and the full error stack trace, so I can try to replicate the issue on my machine? Thanks!

evangeliazve commented 3 years ago

Hello @MrPowers, thanks for your reply.

When I execute the following code everything goes fine : ! pip install ceja import ceja

actual_df = df_txts.withColumn("list_of_words_stem", ceja.porter_stem(col("list_of_words")))

However, even though the objet class is dataframe when I use the .show() fonction to show up the result table I obtain the following error message:

PythonException Traceback (most recent call last)

in ----> 1 actual_df.show() /databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical) 439 """ 440 if isinstance(truncate, bool) and truncate: --> 441 print(self._jdf.showString(n, 20, vertical)) 442 else: 443 print(self._jdf.showString(n, int(truncate), vertical)) /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1303 answer = self.gateway_client.send_command(command) 1304 return_value = get_return_value( -> 1305 answer, self.gateway_client, self.target_id, self.name) 1306 1307 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 131 # Hide where the exception came from that shows a non-Pythonic 132 # JVM exception message. --> 133 raise_from(converted) 134 else: 135 raise /databricks/spark/python/pyspark/sql/utils.py in raise_from(e) PythonException: An exception was thrown from a UDF: 'TypeError: str argument expected'. Full traceback below: Traceback (most recent call last): File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-e524d562-8b36-4d72-8aab-f20b7e4b5527/lib/python3.7/site-packages/ceja/functions.py", line 27, in porter_stem return None if s == None else J.porter_stem(s) TypeError: str argument expected **I converted then to string array format** ![image](https://user-images.githubusercontent.com/19437590/109128321-ab993b80-774f-11eb-803f-09de80b8ebbc.png) **An tried to use it with the following code:** # TF cv = CountVectorizer(inputCol="list_of_words_stem", outputCol="raw_features", vocabSize=5000, minDF=10.0) cvmodel = cv.fit(df_txts) result_cv = cvmodel.transform(df_txts) # IDF idf = IDF(inputCol="raw_features", outputCol="features") idfModel = idf.fit(result_cv) result_tfidf = idfModel.transform(result_cv) **And I obtained the following message:** --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in 1 # TF 2 cv = CountVectorizer(inputCol="list_of_words_stem", outputCol="raw_features", vocabSize=5000, minDF=10.0) ----> 3 cvmodel = cv.fit(df_txts) 4 result_cv = cvmodel.transform(df_txts) 5 # IDF /databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params) 127 return self.copy(params)._fit(dataset) 128 else: --> 129 return self._fit(dataset) 130 else: 131 raise ValueError("Params must be either a param map or a list/tuple of param maps, " /databricks/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset) 319 320 def _fit(self, dataset): --> 321 java_model = self._fit_java(dataset) 322 model = self._create_model(java_model) 323 return self._copyValues(model) /databricks/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset) 316 """ 317 self._transfer_params_to_java() --> 318 return self._java_obj.fit(dataset._jdf) 319 320 def _fit(self, dataset): /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1303 answer = self.gateway_client.send_command(command) 1304 return_value = get_return_value( -> 1305 answer, self.gateway_client, self.target_id, self.name) 1306 1307 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 125 def deco(*a, **kw): 126 try: --> 127 return f(*a, **kw) 128 except py4j.protocol.Py4JJavaError as e: 129 converted = convert_exception(e.java_exception) /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while calling o1117.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 35.0 failed 4 times, most recent failure: Lost task 2.3 in stage 35.0 (TID 151, 10.58.29.71, executor 8): ExecutorLostFailure (executor 8 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2333) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2354) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2373) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398) at org.apache.spark.rdd.RDD.count(RDD.scala:1234) at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:234) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Could you please help me ? Best, Evangelia
shemekhe commented 3 years ago

I have the exact same issue with DataBricks on AWS.

I'm trying to use this library inside a UDF. I'll get An exception was thrown from a UDF: 'pyspark.serializers.SerializationError: everytime I try to use my udf.