jpmml / jpmml-sparkml

Java library and command-line application for converting Apache Spark ML pipelines to PMML
GNU Affero General Public License v3.0
267 stars 80 forks source link

Support transformed labels #35

Open sctincman opened 6 years ago

sctincman commented 6 years ago

Running Spark 2.1.2, using jpmml-sparkml 1.2.7.

While attempting to run the following pyspark in order to convert a simple pipeline with a RandomForestClassifer model with either toPMMLByteArray or toPMML, I'm receiving the a NullPointerException.

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import *

def updateFlightsSchema(dataSet):
    return ( dataSet.withColumn("DepDelay_Double",  dataSet["DepDelay"].cast("Double"))
                    .withColumn("DepDelay",         dataSet["DepDelay"].cast("Double"))
                    .withColumn("ArrDelay",         dataSet["ArrDelay"].cast("Double"))
                    .withColumn("Month",            dataSet["Month"].cast("Double"))
                    .withColumn("DayofMonth",       dataSet["DayofMonth"].cast("Double"))
                    .withColumn("CRSDepTime",       dataSet["CRSDepTime"].cast("Double"))
                    .withColumn("Distance",         dataSet["Distance"].cast("Double"))
                    .withColumn("AirTime",          dataSet["AirTime"].cast("Double"))
            )

data2007 = updateFlightsSchema(sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("mode", "DROPMALFORMED").load("2007_short.csv"))

removeCancelled = SQLTransformer(statement="select * from __THIS__ where Cancelled = \"0\" AND Diverted = \"0\"")
data2007 = removeCancelled.transform(data2007)

binarizer = Binarizer(threshold=15.0, inputCol="DepDelay_Double", outputCol="DepDelay_Bin")
featuresAssembler = VectorAssembler(inputCols=["Month", "CRSDepTime", "Distance"], outputCol="features")
rfc3 = RandomForestClassifier(labelCol="DepDelay_Bin", featuresCol="features", numTrees=3, maxDepth=5, seed=10305)

pipelineRF3 = Pipeline(stages=[binarizer, featuresAssembler, rfc3])

model3 = pipelineRF3.fit(data2007)

from py4j.java_gateway import JavaClass
from pyspark.ml.common import _py2java

javaDF = _py2java(sc, data2007)
javaSchema = javaDF.schema.__call__()

jvm = sc._gateway.jvm

javaConverter = sc._gateway.jvm.org.jpmml.sparkml.ConverterUtil
if(not isinstance(javaConverter, JavaClass)):
    raise RuntimeError("JPMML-SparkML not found on classpath")

pmml = jvm.org.jpmml.sparkml.ConverterUtil.toPMMLByteArray(javaSchema, model3._to_java())
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.jpmml.sparkml.ConverterUtil.toPMMLByteArray.
: java.lang.NullPointerException
    at org.jpmml.converter.CategoricalLabel.<init>(CategoricalLabel.java:35)
    at org.jpmml.sparkml.ModelConverter.encodeSchema(ModelConverter.java:82)
    at org.jpmml.sparkml.ModelConverter.registerModel(ModelConverter.java:162)
    at org.jpmml.sparkml.ConverterUtil.toPMML(ConverterUtil.java:86)
    at org.jpmml.sparkml.ConverterUtil.toPMMLByteArray(ConverterUtil.java:142)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

Following #22 I attempted to use the different Indexers on features and label columns to try and hint that these are categorical, but this resulted in the same error. Further, when I print the final tree, I do not see categorical feature declarations.

Dataset used, and tree output attached. 2007_short.zip rfc.txt

vruusmann commented 6 years ago

The JPMML-SparkML library assumes that the label column of classification models is a "native" categorical label (in PMML, corresponds to a DataDictionary/DataField element), not a "transformed" categorical label (corresponds to a TransformationDictionary/DerivedField element).

I've been taking it granted, and forgot to actually implement this "native" vs "transformed" check around ModelConverter.java:82.

It's possible to make your example work, by applying the Binarize transformation to the dataset outside of the pipeline, and then treating its output column "DepDelay_Bin" as a "native" categorical label:

binarizer = Binarizer(threshold=15.0, inputCol="DepDelay_Double", outputCol="DepDelay_Bin")
data2007 = binarizer.transform(data2007) # THIS!

stringIndexer = StringIndexer(inputCol="DepDelay_Bin", outputCol="DepDelay_Bin_Label") # THIS!
featuresAssembler = VectorAssembler(inputCols=["Month", "CRSDepTime", "Distance"], outputCol="features")
rfc3 = RandomForestClassifier(labelCol="DepDelay_Bin_Label", featuresCol="features", numTrees=3, maxDepth=5, seed=10305)

pipelineRF3 = Pipeline(stages=[stringIndexer, featuresAssembler, rfc3]) # THIS: start the pipeline with StringIndexer not Binarizer

model3 = pipelineRF3.fit(data2007)

from jpmml_sparkml import toPMMLBytes
pmmlBytes = toPMMLBytes(sc, data2007, model3)
print(pmmlBytes.decode("UTF-8"))
vruusmann commented 6 years ago

Technically, it shouldn't be much work to make JPMML-SparkML work with "transformed" labels, so keeping this issue open to track progress towards this functionality.

alex-krash commented 5 years ago

Looks like it can be closed for current version:

            Binarizer binarizer = new Binarizer()
                    .setInputCol("Sepal_Length")
                    .setOutputCol("Sepal_Length_Binar_")
                    .setThreshold(5.0)
            ;

            StringIndexer labelIndexer = new StringIndexer()
                    .setInputCol("Species")
                    .setOutputCol("Species_Bin");

            VectorAssembler vectorAssembler = new VectorAssembler()
                    .setInputCols(new String[]{
                            "Sepal_Length_Binar_",
                            "Sepal_Width",
                            "Petal_Length",
                            "Petal_Width"})
                    .setOutputCol("features");

            RandomForestClassifier classifier = new RandomForestClassifier()
                    .setLabelCol("Species_Bin");

            Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]{binarizer, labelIndexer, vectorAssembler, classifier});
            PipelineModel model = pipeline.fit(dataset);

            PMMLBuilder builder = new PMMLBuilder(schema, model);
            final PMML build = builder.build();
            JAXBUtil.marshalPMML(build, new StreamResult(System.out));
vruusmann commented 5 years ago

Looks like it can be closed for current version

Nope, I'd like to be able to use Sepal_Length_Binar_ as the label column here.

borisborowsky commented 5 years ago

Can someone help me with this error: AttributeError: 'Pipeline' object has no attribute '_transfer_param_map_to_java' error. I get it when i try to execute the PMMLBuilder()

dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features")
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol='prediction', metricName='f1')

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6])

             .addGrid(dt.maxBins, [570, 570])

             .build())

stages += [dt]
pipeline = Pipeline(stages=stages)

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

cvModel = cv.fit(dataSet)
train_dataset = cvModel.transform(dataSet)

train_dataset.show()
print(evaluator.evaluate(train_dataset))

pmmlBuilder = PMMLBuilder(spark, dataSet, cvModel) \
    .putOption(dt, "compact", True)

pmmlBuilder.buildFile("DecisionTreeIris.pmml")

I cannot find any fix to this what I am doing wrong ?

vruusmann commented 5 years ago

AttributeError: 'Pipeline' object has no attribute '_transfer_param_map_to_java' error

This is clearly a low-level PySpark error, which has got nothing to do with PySpark2PMML or JPMML-SparkML.

Maybe your PySpark and Apache Spark versions are out of sync.

borisborowsky commented 5 years ago

@vruusmann Thank you. My PySpark and Apache versions are up to date. The problem was you must pass the pipeline's bestmodel in my case cvModel.bestModel do the work.

borisborowsky commented 5 years ago

@vruusmann Sorry for the off-topic i will delete the question but now i run into another issue when i try to buildFile from the pmmlBuilder object it says format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o57101.buildFile. : java.lang.IllegalArgumentException: Expected 3 target categories, got 2 target category, raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.IllegalArgumentException: 'Expected 3 target categories, got 2 target categories'. I cannot understand why do you have a clue ?