jpmml / jpmml-sparkml

Java library and command-line application for converting Apache Spark ML pipelines to PMML
GNU Affero General Public License v3.0
267 stars 80 forks source link

can't build pmml file with cannot resolve field error #58

Closed yairdata closed 5 years ago

yairdata commented 5 years ago

i succeed to create the pmmlBuilder with:

    from pyspark2pmml import PMMLBuilder

    pmmlBuilder = PMMLBuilder(spark.sparkContext, train, model) \
        .putOption(None, 
        spark.sparkContext._jvm.org.jpmml.sparkml.model.HasTreeOptions.OPTION_COMPACT, True) \
        .verify(initial_training_data.sample(False, 0.9))

BTW - just after changing StandardScaler to MaxAbsScaler .... my pipeline is:

 dateImputer = SQLTransformer(
       statement="SELECT {},{}  FROM __THIS__".format(", ".join(map(str, fieldList_first)),
       " , ".join(["(CASE WHEN {} IS NULL THEN '01-JAN-70 12.00.00.000000 AM' ELSE {} END) as 
                 {}".format(x,x,x+"calced") for x in date_columns])))
stages += [dateImputer]

fieldDropper = SQLTransformer(
     statement="SELECT {} FROM __THIS__".format(", ".join(map(str, fieldList))))

stages += [fieldDropper ]

catImputer =  SQLTransformer(
      statement="SELECT {},{}  FROM __THIS__".format(", ".join(map(str, fieldList_cont)),
        " , ".join(["(CASE WHEN ({} IS NULL or {} = '') THEN 'EMPTY' ELSE {} END) as {}".format(x,x,x,x+"calced") for x in categoricalCols])))

stages += [catImputer]

#can replace with imputer if use mean value

numImputer = SQLTransformer(
    statement="SELECT {},{}  FROM __THIS__".format(gpg_alert+","+labelCol+","+", ".join(map(str, fieldList_categ_calced)),
     " , ".join(["(CASE WHEN {} IS NULL THEN 0 ELSE {} END) as {}".format(x,x,x+"calced") for x in numeric_columns+added_flds])))
stages += [numImputer]

statusTransformer = SQLTransformer(
     statement="SELECT {},(CASE WHEN STATUS =='Y' THEN 1 ELSE 0 END) AS STATUScalced FROM __THIS__".format(some_alert+","+", ".join(map(str, fieldList_calced))))

stages += [statusTransformer]

for categoricalCol in categoricalCols:
    stringIndexer = StringIndexer(inputCol = categoricalCol+"calced", outputCol = categoricalCol + "classVec",handleInvalid="keep")

    stages += [stringIndexer]

HashedInputs = [c + "classVec" for c in categoricalCols] +[d + "calced" for d in continuousCols if d not in [f for f in date_columns]] 

assembler = VectorAssembler(inputCols=HashedInputs,outputCol="features") 

stages += [assembler ]

fieldsSelector = SQLTransformer(
     statement="SELECT features,{} AS label,{} AS some_alert,{} FROM __THIS__".format(labelCol+"calced",some_alert,", ".join(map(str, HashedInputs))))

stages += [fieldsSelector]

#scalerTranformer = StandardScaler(inputCol='features', 
#                    outputCol='scaledFeatures', 
#                    withStd=True, withMean=True)
scalerTranformer = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
stages += [scalerTranformer ]

fieldTransformer = SQLTransformer(
     statement="SELECT scaledFeatures AS features,label,somelabel FROM __THIS__")

stages += [fieldTransformer ]

gbt = GBTClassifier(featuresCol="features",maxBins=5,maxDepth=3,maxIter=3)

stages += [gbt]

pipeline = Pipeline(stages=stages)`

the error message on:

  pmmlBuilder.buildFile("gbt.pmml")

is:


    : org.apache.spark.sql.AnalysisException: cannot resolve '`PARTY_COUNTRY_CD`' given input columns: 
     [sql2pmml_2.probability, sql2pmml_2.label, sql2pmml_2.gpglabel, sql2pmml_2.features, 
      sql2pmml_2.rawPrediction, sql2pmml_2.prediction];
     .....
     at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)```
vruusmann commented 5 years ago

This AnalysisException is coming from Apache Spark ML code, not JPMML-SparkML code - see the locations of stack trace elements - they are all pointing to org.apache.spark.sql.

Apparently, your programmatically generated SQL statement is incomplete/wrong.

yairdata commented 5 years ago

but this errors source is triggered by the pmml build file , see in the full stack trace, the PipelineModel creation and transformation themselves succeeded, just the creation of the pmml file fails. it's like it expects the original fields to appear, but looks at the output fields instead on the inputFields ...

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
    at org.jpmml.sparkml.DatasetUtil.createAnalyzedLogicalPlan(DatasetUtil.java:53)
    at org.jpmml.sparkml.feature.SQLTransformerConverter.encodeFeatures(SQLTransformerConverter.java:62)
    at org.jpmml.sparkml.feature.SQLTransformerConverter.registerFeatures(SQLTransformerConverter.java:141)
    at org.jpmml.sparkml.PMMLBuilder.build(PMMLBuilder.java:110)
    at org.jpmml.sparkml.PMMLBuilder.buildFile(PMMLBuilder.java:263)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
vruusmann commented 5 years ago

but this errors source is triggered by the pmml build file, see in the full stack trace,

The JPMML-SparkML library calls a standard Apache Spark ML method, which then fails.

just the creation of the pmml file fails.

Apparently, you're supplying incorrect/incomplete data to the pmml builder component.

TLDR: The original report does not contain any actionable info. Provide a reproducible example for me, or work on it yourself.

yairdata commented 5 years ago

i'll try validating it, but in principal - does the pmmlbuilder expect to have all input fields at the end of the transformations ? i select just part of the fields in sqlTransformer - as a column Pruner , but it still looks for the input field

vruusmann commented 5 years ago

does the pmmlbuilder expect to have all input fields at the end of the transformations ?

The pmml builder components needs to know the complete schema of the dataset that enters the pipeline (not a partial schema at some pipeline stage, or coming out of the pipeline).

i select just part of the fields in sqlTransformer

That's the source of the problem - you're giving the pmml builder component incomplete information, and then expect it to be able to do its job correctly?

yairdata commented 5 years ago

i think you misunderstood me. i am giving the pmml the whole dataframe (and thus it's scheme). in the pipeline stages i do feature engineering (adding extra fields) and dropping fields. i am left at the end just with features and label as must have fields for the gbtclassifier algorithm. this i think is a standard spark ML pipeline. so i am asking if all input fields should be kept until the end of the pipeline for the pmml to know of the path , or is it ok to drop fields in a transformation stages in the middle of the pipeline.

vruusmann commented 5 years ago

is it ok to drop fields in a transformation stages in the middle of the pipeline.

What's the point of manually selecting fields? Everything is lazily computed and fetched anyway.

Build the simplest pipeline possible, and make sure that the pmml builder component can understand and convert it correctly.

yairdata commented 5 years ago

ok, thanks, i have overcome this issue but still face issue with sqlTransformer. i verified the ML pipeline completed ok , but the pmmlBuilder.buildFile("file.pmml")

gives:

Py4JJavaError:

An error occurred while calling o33946.buildFile. : java.lang.IllegalArgumentException: unix_timestamp(OPEN_DATE#316046, dd-MMM-yy hh.mm.ss.SSSSSS, Some(Asia/Jerusalem)) at org.jpmml.sparkml.ExpressionTranslator.translateInternal(ExpressionTranslator.java:316) at org.jpmml.sparkml.ExpressionTranslator.translateInternal(ExpressionTranslator.java:166) at org.jpmml.sparkml.ExpressionTranslator.translate(ExpressionTranslator.java:72) at org.jpmml.sparkml.ExpressionTranslator.translate(ExpressionTranslator.java:67) at org.jpmml.sparkml.feature.SQLTransformerConverter.encodeFeatures(SQLTransformerConverter.java:110) at org.jpmml.sparkml.feature.SQLTransformerConverter.registerFeatures(SQLTransformerConverter.java:141) at org.jpmml.sparkml.PMMLBuilder.build(PMMLBuilder.java:110) at org.jpmml.sparkml.PMMLBuilder.buildFile(PMMLBuilder.java:263) at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

the relevant transform step is:

  date_feature_generator = SQLTransformer(
statement="SELECT {}, (unix_timestamp(OPEN_DATE,'dd-MMM-yy hh.mm.ss.SSSSSS')-\
                     unix_timestamp(JOIN_DATE,'dd-MMM-yy hh.mm.ss.SSSSSS'))\
                     AS JOIN_DELTA,\
                      FROM __THIS__".format(", ".join(map(str, fieldList))))
stages += [date_feature_generator ]

is it an error in my code or there is an issue using the sql functions in the pipeline ?

vruusmann commented 5 years ago

java.lang.IllegalArgumentException: unix_timestamp(OPEN_DATE#316046, dd-MMM-yy hh.mm.ss.SSSSSS, Some(Asia/Jerusalem))

The current SQL-to-PMML translator implementation only knows about built-in Apache SQL operators and functions (aka expressions). For a list of supported expression types, see this method: https://github.com/jpmml/jpmml-sparkml/blob/master/src/main/java/org/jpmml/sparkml/ExpressionTranslator.java#L84

The unix_timestamp() function is not supported at the moment. But adding support for it shouldn't too difficult, because PMML has rich date/time/datetime type capabilities.

yairdata commented 5 years ago

ok , so what are the current options for calculating the delta between 2 dates ?

vruusmann commented 5 years ago

so what are the current options for calculating the delta between 2 dates ?

Implement support for the unix_timestamp() function, and then perform a regular subtraction between two terms:

unix_timestamp(end) - unix_timestamp(begin)
yairdata commented 5 years ago

you mean change the code of sparkml/blob/master/src/main/java/org/jpmml/sparkml/ExpressionTranslator.java ? or i can use udf in the sql expression ? i doubt it if i can check-in changes to the code because of security retrictions in my company. can you mark it as future enhancement ?