jpmml / jpmml-sparkml

Java library and command-line application for converting Apache Spark ML pipelines to PMML
GNU Affero General Public License v3.0
267 stars 80 forks source link

PMML buildFile error when using KMeans clustering in pipeline #60

Closed rjphofmann closed 5 years ago

rjphofmann commented 5 years ago

I'm currently constructing a pipeline who's first two components are a vector assembler and kmeans clustering estimator, followed by a second vector assembler and then random forest classifier. My thought is that it doesn't like the vector column created by the first call to VectorAssembler that gets passed along the pipeline. Any thoughts/recommendations?

Code:

rf_feature = 'device_type'
cluster_features = ['device_os','channel_id']

all_data = all_data.select(cluster_features + [rf_feature,'win']).dropna()
all_data.cache()

# KMeans
cluster_feature_vector = 'cluster_features'
transformed_feature = 'kmeans_clustering'

cluster_assembler = VectorAssembler(inputCols=cluster_features, outputCol=cluster_feature_vector)
kmeans = KMeans(featuresCol=cluster_feature_vector, predictionCol=transformed_feature).setK(8).setSeed(123)

assembler = VectorAssembler(inputCols=[transformed_feature,rf_feature],outputCol="features")
rf = RandomForestClassifier(labelCol="win", featuresCol="features", numTrees=10, maxBins=MAX_CATS)

trainingPipeline = Pipeline(stages=[cluster_assembler, kmeans, assembler, rf])
trainingModel = trainingPipeline.fit(all_data)

pmmlBuilder = PMMLBuilder(sc, all_data, trainingModel).putOption(None, sc._jvm.org.jpmml.sparkml.model.HasTreeOptions.OPTION_COMPACT, True)    
pmmlBuilder.buildFile("pipeline_w_" + transformed_feature + ".pmml")

Error:
Py4JJavaError: An error occurred while calling o369.buildFile.
: java.lang.UnsupportedOperationException
    at org.jpmml.converter.ObjectFeature.toContinuousFeature(ObjectFeature.java:32)
    at org.jpmml.sparkml.model.TreeModelUtil.encodeNode(TreeModelUtil.java:223)
    at org.jpmml.sparkml.model.TreeModelUtil.encodeNode(TreeModelUtil.java:295)
....

Currently using Spark 2.4 w/ the uber jar: jpmml-sparkml-executable-1.5.1.jar

vruusmann commented 5 years ago

Looks like the PMMLBuilder component assumes that KMeans is generating a string prediction column (that cannot be force-cast to a numeric column).

Should check with Apache Spark documentation, but I suspect that KMeans is generating an integer prediction column instead (where the column value is simply the cluster index).

Anyway, this pipeline should convert nicely if the model in the first step was some Classifier or Regressor.

rjphofmann commented 5 years ago

So what exactly do you recommend I do? KMeans does indeed output a numeric (integer) column after running. I don't believe there's a way to force it to output a string column. Please advise! Thanks!

vruusmann commented 5 years ago

So what exactly do you recommend I do?

Apply StringIndexer to explicitly turn a string column to integer column?

rjphofmann commented 5 years ago

@vruusmann - I'm still having trouble, even when I add in a stringIndexer step. It doesn't appear that the PMML can find the cluster column name to then run the stringIndexer, so I get the following error:

---------------------------------------------------------------------------
IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-15-66290a206af7> in <module>()
     41 #train_pred.show()
     42 pmmlBuilder = PMMLBuilder(sc, all_data, trainingModel).putOption(None, sc._jvm.org.jpmml.sparkml.model.HasTreeOptions.OPTION_COMPACT, True)
---> 43 pmmlBuilder.buildFile("dspbre_pipeline_w_" + transformed_feature + ".pmml")

/home/shared/phofmann/.local/lib/python2.7/site-packages/pyspark2pmml/__init__.pyc in buildFile(self, path)
     25         def buildFile(self, path):
     26                 javaFile = self.sc._jvm.java.io.File(path)
---> 27                 javaFile = self.javaPmmlBuilder.buildFile(javaFile)
     28                 return javaFile.getAbsolutePath()
     29 

/mnt/airflow/code/spark-2.4.0-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/mnt/airflow/code/spark-2.4.0-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw)
     77                 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
     78             if s.startswith('java.lang.IllegalArgumentException: '):
---> 79                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
     80             raise
     81     return deco

IllegalArgumentException: u'kmeans_clustering'

New code:

rf_feature = 'device_type'
cluster_features = ['device_os','channel_id','player_size_bucket','playtime_bucket']

all_data = all_data.select(cluster_features + [rf_feature,'bid']).dropna()
all_data.cache()

# KMeans
cluster_feature_vector = 'cluster_features'
transformed_feature = 'kmeans_clustering'

cluster_assembler = VectorAssembler(inputCols=cluster_features, outputCol=cluster_feature_vector)
kmeans = KMeans(featuresCol=cluster_feature_vector, predictionCol=transformed_feature).setK(8).setSeed(123)

# Index output column from KMeans
transformed_feature_indexed = transformed_feature + '_indexed'
stringIndexer = StringIndexer(inputCol=transformed_feature, outputCol=transformed_feature_indexed, handleInvalid='keep')

assembler = VectorAssembler(inputCols=[transformed_feature_indexed,rf_feature],outputCol="features")
rf = RandomForestClassifier(labelCol="bid", featuresCol="features", numTrees=10, maxBins=MAX_CATS + 1)

trainingPipeline = Pipeline(stages=[cluster_assembler, kmeans, stringIndexer, assembler, rf])
trainingModel = trainingPipeline.fit(all_data)
#train_pred = trainingModel.transform(all_data)

#train_pred.show()
pmmlBuilder = PMMLBuilder(sc, all_data, trainingModel).putOption(None, sc._jvm.org.jpmml.sparkml.model.HasTreeOptions.OPTION_COMPACT, True)    
pmmlBuilder.buildFile("dspbre_pipeline_w_" + transformed_feature + ".pmml")
vruusmann commented 5 years ago

I fixed the original issue yesterday, so the KMeans is now producing integer columns now. It doesn't make sense now to apply StringIndexer to it anymore; perhaps OneHotEncoder would be more appropriate?