When using a model that contains a FieldRef, I get a Spark serialization error when using pypmml-spark==1.5.2.
This is with
PySpark 3.4.1
Python 3.10.12
Example script with minimal regression model that uses FieldRef to duplicate the output field.
import importlib.resources
import pypmml_spark
from pyspark.sql import SparkSession
pmml = """
<PMML xmlns="https://www.dmg.org/PMML-4_2" version="4.2">
<Header copyright="DMG.org"/>
<DataDictionary numberOfFields="3">
<DataField name="x" optype="continuous" dataType="double"/>
<DataField name="y" optype="continuous" dataType="double"/>
<DataField name="y2" optype="continuous" dataType="double"/>
</DataDictionary>
<RegressionModel modelName="Sample for linear regression" functionName="regression" algorithmName="linearRegression" targetFieldName="number_of_claims">
<MiningSchema>
<MiningField name="x"/>
<MiningField name="y" usageType="target"/>
<MiningField name="y2" usageType="target"/>
</MiningSchema>
<Output>
<OutputField name="y" optype="continuous" dataType="double" feature="predictedValue"/>
<OutputField name="y2" optype="continuous" dataType="double" feature="transformedValue">
<FieldRef field="y"/>
</OutputField>
</Output>
<RegressionTable intercept="0.0">
<NumericPredictor name="x" exponent="1" coefficient="2."/>
</RegressionTable>
</RegressionModel>
</PMML>
"""
# help Spark find the pypmml JAR files
pypmml_jars_dir = importlib.resources.path('pypmml_spark', 'jars')
jar_files = [str(x) for x in pypmml_jars_dir.glob('*.jar')]
jars = ','.join(jar_files)
spark = SparkSession.builder.config("spark.jars", jars).getOrCreate()
# load model
model = pypmml_spark.ScoreModel.fromString(pmml)
# create test DF and score
df = spark.createDataFrame([{'x': 1.}])
model.transform(df).show()
With pypmml-spark==1.0.2 I get this output, all good:
+---+---+---+
| x| y| y2|
+---+---+---+
|1.0|2.0|2.0|
+---+---+---+
Process finished with exit code 0
However, with pypmml-spark==1.5.2 I get this error:
Traceback (most recent call last):
File "/mnt/c/Users/jrauch/Python/dap_pmml/src/unittest/test.py", line 44, in <module>
model.transform(df).show()
File "/home/jrauch/dap_pmml_wsl/lib/python3.10/site-packages/pyspark/ml/base.py", line 262, in transform
return self._transform(dataset)
File "/home/jrauch/dap_pmml_wsl/lib/python3.10/site-packages/pyspark/ml/wrapper.py", line 398, in _transform
return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sparkSession)
File "/home/jrauch/dap_pmml_wsl/lib/python3.10/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/jrauch/dap_pmml_wsl/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py", line 169, in deco
return f(*a, **kw)
File "/home/jrauch/dap_pmml_wsl/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o27.transform.
: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2526)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$1(RDD.scala:853)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:408)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:852)
at org.pmml4s.spark.ScoreModel.transform(ScoreModel.scala:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.NotSerializableException: org.pmml4s.data.NullVal$
Serialization stack:
- object not serializable (class: org.pmml4s.data.NullVal$, value: )
- field (class: org.pmml4s.transformations.FieldRef, name: replacement, type: interface org.pmml4s.data.DataVal)
- object (class org.pmml4s.transformations.FieldRef, org.pmml4s.transformations.FieldRef@71d00e9a)
- field (class: scala.Some, name: value, type: class java.lang.Object)
- object (class scala.Some, Some(org.pmml4s.transformations.FieldRef@71d00e9a))
- field (class: org.pmml4s.metadata.OutputField, name: expr, type: class scala.Option)
- object (class org.pmml4s.metadata.OutputField, OutputField(name=y2, displayName=None, dataType=double, opType=continuous, feature=transformedValue, targetField=None, value=None, ruleFeature=consequent, algorithm=exclusiveRecommendation, rank=1, rankBasis=confidence, rankOrder=descending, isMultiValued=false, segmentId=None, isFinalResult=true, decisions=None, expr=Some(org.pmml4s.transformations.FieldRef@71d00e9a)))
- element of array (index: 1)
- array (class [Lorg.pmml4s.metadata.OutputField;, size 2)
- field (class: org.pmml4s.metadata.Output, name: outputFields, type: class [Lorg.pmml4s.metadata.OutputField;)
- object (class org.pmml4s.metadata.Output, org.pmml4s.metadata.Output@35cf7365)
- field (class: scala.Some, name: value, type: class java.lang.Object)
- object (class scala.Some, Some(org.pmml4s.metadata.Output@35cf7365))
- field (class: org.pmml4s.model.RegressionModel, name: output, type: class scala.Option)
- object (class org.pmml4s.model.RegressionModel, org.pmml4s.model.RegressionModel@84b1c74)
- field (class: org.pmml4s.spark.ScoreModel, name: model, type: class org.pmml4s.model.Model)
- object (class org.pmml4s.spark.ScoreModel, scoreModel_900fa909a6df)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 2)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.pmml4s.spark.ScoreModel, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/pmml4s/spark/ScoreModel.$anonfun$transform$2:(Lorg/pmml4s/spark/ScoreModel;Lscala/collection/Seq;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=2])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.pmml4s.spark.ScoreModel$$Lambda$1892/1581039073, org.pmml4s.spark.ScoreModel$$Lambda$1892/1581039073@63232177)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
... 21 more
Process finished with exit code 1
Note: When removing the FieldRef everything is fine with the newer version, too.
When using a model that contains a
FieldRef
, I get a Spark serialization error when usingpypmml-spark==1.5.2
.This is with
Example script with minimal regression model that uses FieldRef to duplicate the output field.
With
pypmml-spark==1.0.2
I get this output, all good:However, with
pypmml-spark==1.5.2
I get this error:Note: When removing the
FieldRef
everything is fine with the newer version, too.