autodeployai / pmml4s

PMML scoring library for Scala
https://www.pmml4s.org/
Apache License 2.0
58 stars 9 forks source link

java.io.NotSerializableException: org.pmml4s.transformations.FieldColumnPair #1

Closed alitrack closed 5 years ago

alitrack commented 5 years ago

I tried to using PMML model export with JPMML-SparkML, when run

import warnings
warnings.filterwarnings('ignore')
from pyspark import SparkConf
conf=SparkConf()\
.set("spark.jars.packages","org.pmml4s:pmml4s-spark_2.11:0.9.0")
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.config(conf=conf)\
.getOrCreate()

from pypmml_spark import ScoreModel
model = ScoreModel.fromFile('DecisionTreeIris.pmml').setPredictionCol("prediction")
df = spark.read.csv("iris.csv", header = True, inferSchema = True)
df=df.toDF(*(c.replace('.', '_').lower() for c in df.columns))

score_df = model.transform(df)

got the following exception,

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-53-82aed306ef91> in <module>
----> 1 score_df = model.transform(df)

~/.pyenv/versions/3.6.4/envs/ts/lib/python3.6/site-packages/pyspark/ml/base.py in transform(self, dataset, params)
    171                 return self.copy(params)._transform(dataset)
    172             else:
--> 173                 return self._transform(dataset)
    174         else:
    175             raise ValueError("Params must be a param map but got %s." % type(params))

~/.pyenv/versions/3.6.4/envs/ts/lib/python3.6/site-packages/pyspark/ml/wrapper.py in _transform(self, dataset)
    310     def _transform(self, dataset):
    311         self._transfer_params_to_java()
--> 312         return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sql_ctx)
    313 
    314 

~/.pyenv/versions/3.6.4/envs/ts/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~/.pyenv/versions/3.6.4/envs/ts/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

~/.pyenv/versions/3.6.4/envs/ts/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o475.transform.
: org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
    at org.pmml4s.spark.ScoreModel.transform(ScoreModel.scala:59)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.pmml4s.transformations.FieldColumnPair
Serialization stack:
    - object not serializable (class: org.pmml4s.transformations.FieldColumnPair, value: org.pmml4s.transformations.FieldColumnPair@3e25a3a)
    - element of array (index: 0)
    - array (class [Lorg.pmml4s.transformations.FieldColumnPair;, size 1)
    - field (class: org.pmml4s.transformations.MapValues, name: fieldColumnPairs, type: class [Lorg.pmml4s.transformations.FieldColumnPair;)
    - object (class org.pmml4s.transformations.MapValues, org.pmml4s.transformations.MapValues@313fbaaf)
    - field (class: scala.Some, name: x, type: class java.lang.Object)
    - object (class scala.Some, Some(org.pmml4s.transformations.MapValues@313fbaaf))
    - field (class: org.pmml4s.metadata.OutputField, name: expr, type: class scala.Option)
    - object (class org.pmml4s.metadata.OutputField, OutputField(name=prediction, displayName=None, dataType=double, opType=nominal, feature=transformedValue, targetField=None, value=None, ruleFeature=consequent, algorithm=exclusiveRecommendation, rank=1, rankBasis=confidence, rankOrder=descending, isMultiValued=false, segmentId=None, isFinalResult=true, decisions=None, expr=Some(org.pmml4s.transformations.MapValues@313fbaaf)))
    - element of array (index: 1)
    - array (class [Lorg.pmml4s.metadata.OutputField;, size 5)
    - field (class: org.pmml4s.metadata.Output, name: outputFields, type: class [Lorg.pmml4s.metadata.OutputField;)
    - object (class org.pmml4s.metadata.Output, org.pmml4s.metadata.Output@49f822da)
    - field (class: scala.Some, name: x, type: class java.lang.Object)
    - object (class scala.Some, Some(org.pmml4s.metadata.Output@49f822da))
    - field (class: org.pmml4s.model.TreeModel, name: output, type: class scala.Option)
    - object (class org.pmml4s.model.TreeModel, org.pmml4s.model.TreeModel@7ddb40e2)
    - field (class: org.pmml4s.spark.ScoreModel, name: model, type: class org.pmml4s.model.Model)
    - object (class org.pmml4s.spark.ScoreModel, scoreModel_957436760f62)
    - field (class: org.pmml4s.spark.ScoreModel$$anonfun$2, name: $outer, type: class org.pmml4s.spark.ScoreModel)
    - object (class org.pmml4s.spark.ScoreModel$$anonfun$2, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 21 more

the attachment is pmml file and iris.csv data.zip

btw, I tried the example, it works.

alitrack commented 5 years ago

wrong place, sorry.