jpmml / pyspark2pmml

Python library for converting Apache Spark ML pipelines to PMML
GNU Affero General Public License v3.0
95 stars 25 forks source link

Advice on implementing complex string transformations #40

Closed tristers-at-square closed 1 year ago

tristers-at-square commented 1 year ago

Hello,

I am trying to turn my pyspark preprocessing into a PMML artifact. One of the input features is a string. I want to remove comma's from the text feature and also lowercase it. I can accomplish this using two steps:

From checking the JPMML documentation, both RegexTokenizer and SQLTransformer should be convertible into PMML format.

Here's a simple version of my code:

rdd = spark.sparkContext.parallelize([(1, 'Company, Inc'), (2, 'Other Company, Inc')])
dataframe = rdd.toDF()

tokenizer = RegexTokenizer(
    inputCol="_2",
    toLowercase=True,
    pattern='[,]'
)
tokenizer_output_column = tokenizer.getOutputCol()
sql_statement = f"SELECT array_join({tokenizer_output_column}, '', '') from __THIS__"
sql_transformer = SQLTransformer(statement=sql_statement)
pipeline = Pipeline(stages=[tokenizer, sql_transformer])
pipeline_model = pipeline.fit(dataframe)

pmml_builder = pyspark2pmml.PMMLBuilder(spark.sparkContext, dataframe, pipeline_model)
pmml_builder.buildFile("preprocessing.pmml") 

However, this results in the following error:

pyspark.sql.utils.AnalysisException: cannot resolve 'RegexTokenizer_0da640b74642__output' given input columns: [sql2pmml_1._1, sql2pmml_1._2]

It seems to not take into account that a new column is being added during the RegexTokenizer stage. The other possibility is that the _arrayjoin SQL function is not supported (but, if this were the case, I'd expect a different error message).

Any idea on what's going on? Thanks!

Using:

tristers-at-square commented 1 year ago

Really what I need is a Spark transformer that can replace characters in a string feature column, but it doesn't appear that SparkML has any such transformers. The only option would be to use a SQLTransformer but all the string replace functions in Spark SQL are not supported by pyspark2pmml.

vruusmann commented 1 year ago

I want to remove comma's from the text feature and also lowercase it.

In canonical PMML markup, this transformation would be:

<Apply function="lowercase">
  <Apply function="replace">
    <FieldRef field="my_string_input"/>
    <Constant dataType="string">,</Constant>
    <Constant dataType="string"></Constant>
  </Apply>
</Apply>

In other words, what you want to do is well supported by PMML, we just need to figure out how to express this in (Py)Spark ML, so that the JPMML-SparkML library would be able to get hold of it.

I can accomplish this using two steps:

  • Use a RegexTokenizer to split the string on the commas
  • Use a SQLTransformer to join the array of string tokens back together

Looks terribly inefficient, because you're creating a temporary array/vector-type column to hold tokenization results.

The JPMML-SparkML library supports RegexTokenizer as a means to prepare a "bag of words" for TF(-IDF) workflows. Even if this transformer were to interoperate with Spark SQL queries, it was not designed for that.

However, this results in the following error:

pyspark.sql.utils.AnalysisException: cannot resolve 'RegexTokenizer_0da640b74642__output' given input columns: [sql2pmml_1._1, sql2pmml_1._2]

Can you see from the PySpark error stack trace, which component (also, which line number) is responsible for raising this exception? Is it raised by (Py)Spark ML core, or JPMML-SparkML?

Really what I need is a Spark transformer that can replace characters in a string feature column. The only option would be to use a SQLTransformer.

But isn't using SQLTransformer for custom feature transformations the right way?

You can implement custom transformations as Java/Scala UDFs. But going this way, you would need to create a JPMML-SparkML library plugin, which recognizes this custom transformation class, and is able to generate PMML markup based on its instance state. Looks compilicated from the end user perspective.

If the custom transformation business logic can be expressed in terms of (Py)Spark SQL statement(s), then the JPMML-SparkML SQLTransformer converter should help you out with PMML markup generation. Much easier.

But all the string replace functions in Spark SQL are not supported by pyspark2pmml.

See the above PMML sample - we need support for SQL functions that map to PMML functions "lowercase" and "(regex-)replace".

Support for "lowercase": https://github.com/jpmml/jpmml-sparkml/blob/2.2.0/pmml-sparkml/src/main/java/org/jpmml/sparkml/ExpressionTranslator.java#L561-L563

Support for "replace": https://github.com/jpmml/jpmml-sparkml/blob/2.2.0/pmml-sparkml/src/main/java/org/jpmml/sparkml/ExpressionTranslator.java#L453-L461

vruusmann commented 1 year ago

The JPMML-SparkML project has pretty decent unit test coverage in the Spark SQL area, so you can get some code/syntax examples from there if necessary.

Support for "lowercase":

See also: https://github.com/jpmml/jpmml-sparkml/blob/2.2.0/pmml-sparkml/src/test/java/org/jpmml/sparkml/ExpressionTranslatorTest.java#L228-L255

Support for "replace":

See also: https://github.com/jpmml/jpmml-sparkml/blob/2.2.0/pmml-sparkml/src/test/java/org/jpmml/sparkml/ExpressionTranslatorTest.java#L211-L225

TLDR: Everything seems 100% supported. Just write a proper (Py)Spark SQL statement, utilizing the right (Py)Spark SQL built-in functions.

tristers-at-square commented 1 year ago

Thanks @vruusmann.

For some reason, I was getting the following error when using the Spark SQL replace function:

pyspark.sql.utils.IllegalArgumentException: Spark SQL function '...' (class org.apache.spark.sql.catalyst.expressions.StringReplace) is not supported

Which is why I initially tried to combine the RegexTokenizer with _arrayjoin instead.

However, I've gotten everything to work using the _regexpreplace Spark SQL function instead.

vruusmann commented 1 year ago

pyspark.sql.utils.IllegalArgumentException: Spark SQL function '...' (classorg.apache.spark.sql.catalyst.expressions.StringReplace) is not supported

It could be that PySpark is also performing some filtering on the SQL statement, before passing it forward to the core Apache Spark ML engine.

Three dots looks suspicious. Always, when in doubt, use a simpler/more explicit syntax!

However, I've gotten everything to work using the regexp_replace Spark SQL function instead.

So, I assume that the following SQL fragment had potential to solve this current issue: lower(regexp_replace(my_string_col, ",", ""))