maxpumperla / elephas

Distributed Deep learning with Keras & Spark
http://maxpumperla.com/elephas/
MIT License
1.57k stars 310 forks source link

java.lang.NullPointerException while showing predicted output. #202

Open aman1931998 opened 2 years ago

aman1931998 commented 2 years ago

Dear @maxpumperla / other authors of this repo. A big thanks for developing this library, I have been successful in running it some datasets but I am facing issue with the current one. Please help me though it.

Dataset: https://www.kaggle.com/datasets/janiobachmann/bank-marketing-dataset

Dataset looks like this: df.show() image

Schema: df.printSchema() [deposit being the target variable] image

Dataset doesn't have any null values image

After Converting categorical columns. image

After Converting numerical columns via VectorAssembler -> StandardScaler. image

I thought of converting the Vector created into individual columns hence exploded the Vector column. image

Then I converted all the features into a single Vector to create 'features' column. image

But you can see that some are SparseVector and some are DenseVector. Normally, pyspark's functionality is not affected but since the issue I was facing was not getting resolved, hence I forcefully converted each SparseVector to DenseVector. image

With all this done, I converted the target variable ('deposit') via StringIndexer as well and took the features and labels columns to a separate df. image

Keras Model: image

Elephas estimator config: image

Training via elephas and output df (pred_test). image

Error Stack when running pred_test.collect() image

Full Error Stack

Py4JJavaError Traceback (most recent call last) Input In [2718], in <cell line: 1>() ----> 1 pred_test.collect()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:693, in DataFrame.collect(self) 683 """Returns all the records as a list of :class:Row. 684 685 .. versionadded:: 1.3.0 (...) 690 [Row(age=2, name='Alice'), Row(age=5, name='Bob')] 691 """ 692 with SCCallSiteSync(self._sc) as css: --> 693 sock_info = self._jdf.collectToPython() 694 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))

File /opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py:1321, in JavaMember.call(self, *args) 1315 command = proto.CALL_COMMAND_NAME +\ 1316 self.command_header +\ 1317 args_command +\ 1318 proto.END_COMMAND_PART 1320 answer = self.gateway_client.send_command(command) -> 1321 return_value = get_return_value( 1322 answer, self.gateway_client, self.target_id, self.name) 1324 for temp_arg in temp_args: 1325 temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:111, in capture_sql_exception..deco(*a, kw) 109 def deco(*a, *kw): 110 try: --> 111 return f(a, kw) 112 except py4j.protocol.Py4JJavaError as e: 113 converted = convert_exception(e.java_exception)

File /opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o17843.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1258.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1258.0 (TID 985) (.internal executor driver): java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1047) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$scalaConverter$2(ScalaUDF.scala:164) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161) at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53) at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:434) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:269) Caused by: java.lang.NullPointerException

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:394) at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3538) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3535) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1047) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$scalaConverter$2(ScalaUDF.scala:164) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161) at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53) at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:434) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:269) Caused by: java.lang.NullPointerException

Please help me out in this. Thanks in advance.!

aman1931998 commented 2 years ago

Environment Details: Zeppelin 0.10.1 Spark 3.2.1 for hadoop 3.2

Python 3.8.10 PySpark 3.2.1 Elephas 3.1.0 tensorflow 2.8.0 keras 2.8.0

No. of Cores: 2 Memory: 8GB

shahryary commented 2 years ago

Same error for me, I worked a little bit more into model and I realize that the problem could be happen in pyspark serializers, here is the error:

22/06/02 15:56:24 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 599, in main
    eval_type = read_int(infile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.rdd.RDD$$anon$3.hasNext(RDD.scala:951)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.rdd.RDD$$anon$3.foreach(RDD.scala:950)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:670)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:424)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:259)
Caused by: java.lang.NullPointerException
22/06/02 15:56:24 ERROR PythonRunner: This may have been caused by a prior exception:
java.lang.NullPointerException
22/06/02 15:56:24 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 599, in main
    eval_type = read_int(infile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError

Probably updating pyspark( current version 3.2.0) into new one (3.2.1) could solve the problem.

Additionally, somehow in "transform" part the element are array (to see use: predcition.printSchema(), despite the other ML algorithms just giving none-array result, so probably in the array format we couldn't retrieve the data in prediction part

I'll work on it, will update here..

danielenricocahall commented 2 years ago

Hey were there any updates on this front?

albarabimakasa commented 2 years ago

hello every one, I have same problem when i want to show my prediction it's says that

Py4JJavaError: An error occurred while calling o1560.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 79.0 failed 1 times, most recent failure: Lost task 0.0 in stage 79.0 (TID 87) (849282bebc33 executor driver): java.lang.IllegalArgumentException: requirement failed: Index 4 out of bounds for vector of size 4

i don't know what even it's mean, so here my code i hope somebody help me.

from elephas.ml_model import ElephasEstimator
from tensorflow.keras import optimizers
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics

adam = optimizers.Adam(learning_rate=0.1)
opt_conf = optimizers.serialize(adam

estimator = ElephasEstimator()
estimator.setFeaturesCol("scaled_features")            
estimator.setLabelCol("index_category")              
estimator.set_keras_model_config(model.to_json())      l
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(3)  # We just use one worker here. Feel free to adapt it.
estimator.set_epochs(5) 
estimator.set_batch_size(1280)
estimator.set_verbosity(1)
estimator.set_validation_split(0.15)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])

pipeline = Pipeline(stages=[string_indexer, scaler, estimator])
fitted_pipeline = pipeline.fit(train_df)
prediction = fitted_pipeline.transform(test_df) # <-- The same code evaluates test data.
pnl = prediction.select("Fvec", "prediction")
pnl.show(100)
AlessandroMinervini commented 2 years ago

I have the same problem, the model fit and transform correctly but when i try to show the predictions values my script crashes with the same error...

Any solutions?

Thanks

danielenricocahall commented 2 years ago

Hello! It looks like they may be separate issues. Would be happy to look into it:

1) Please post the issues in the new fork: https://github.com/danielenricocahall/elephas 2) Please post a fully reproducible example (if possible).