fugue-project / fugue

A unified interface for distributed computing. Fugue executes SQL, Python, Pandas, and Polars code on Spark, Dask and Ray without any rewrites.
https://fugue-tutorials.readthedocs.io/
Apache License 2.0
1.97k stars 94 forks source link

How to validate schema with nested Pyspark ArrayType/StructType using Pandera? #316

Closed WilliamCVan closed 2 years ago

WilliamCVan commented 2 years ago

I'd like to do schema validation on a Pyspark dataframe with an existing schema

# nested data structure
structureData = [
    ([("James","","Smith")],"36636","M",3100),
    ([("Michael","Rose","")],"40288","M",4300),
    ([("Robert","","Williams")],"42114","M",1400),
    ([("Maria","Anne","Jones")],"39192","F",5500),
    ([("Jen","Mary","Brown")],"","F",-1)
  ]

# nested name fields
structureSchema = StructType([
        StructField('name', ArrayType(StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ]))),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

How would I go about specifying the schema in Pandera to match the above Pyspark schema?

import pandera as pa
from pandera import Column, DataFrameSchema

base_schema = pa.DataFrameSchema({
    "name": ??? <-- HOW TO SPECIFY MATCHING SCHEMA?
    "id": pa.Column(str),
    "gender": pa.Column(str),
    "salary": pa.Column(int),
})
goodwanghan commented 2 years ago

Hello,

I think this question may not be related with Fugue. Pandera has its own representation of schema.

But on the other hand, let's assume this is how you constructed the Spark dataframe:

from pyspark.sql.types import *
from pyspark.sql import SparkSession

structureData = [
    ([("James","","Smith")],"36636","M",3100),
    ([("Michael","Rose","")],"40288","M",4300),
    ([("Robert","","Williams")],"42114","M",1400),
    ([("Maria","Anne","Jones")],"39192","F",5500),
    ([("Jen","Mary","Brown")],"","F",-1)
  ]

structureSchema = StructType([
        StructField('name', ArrayType(StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ]))),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(structureData,structureSchema)

Then you may be able to use Fugue to simplify the schema verification

from fugue_spark._utils.convert import to_schema

assert to_schema(sdf) == "name:[{firstname:str,middlename:str,lastname:str}],id:str,gender:str,salary:int"

Furthermore, see how much code you have to write to construct structureSchema in Spark, you can do this instead (with Fugue)

from fugue_spark._utils.convert import to_spark_schema

structureSchema = to_spark_schema("name:[{firstname:str,middlename:str,lastname:str}],id:str,gender:str,salary:int")
sdf = spark.createDataFrame(structureData,structureSchema)

I understand this doesn't directly solve your problem, but it may be useful to help you simplify the code.

cosmicBboy commented 2 years ago

hi @WilliamCVan pandera currently only supports validation of pyspark.pandas dataframes, not OG pyspark dataframes. Unless you're open to using pyspark.pandas dataframes, the comment ^^ by @goodwanghan is probably the way go.

In any case, assuming that you're trying to validate a pandera-supported dataframe the equivalent pandera schema would rely on something like dataclass or NamedTuple to model the nested data structure:

# WARNING: this code is untested
from typing import NamedTuple

import pandera as pa
from pandera import Column, DataFrameSchema

class Name(NamedTuple):
    first_name: str
    middle_name: str
    last_name: str

def name_check(name_components):
    try:
        # return True if element "name" is a valid `Name` tuple
        Name(*name_components)
        return True
    except:
        return False

base_schema = pa.DataFrameSchema({
    "name": pa.Column(object, pa.Check(name_check, element_wise=True)),
    "id": pa.Column(str),
    "gender": pa.Column(str),
    "salary": pa.Column(int),
})
kvnkho commented 2 years ago

Considering Pandera doesn't have support for PySpark DataFrames (at least for now), you can use Fugue and Pandera like this to validate each partition of the DataFrame as a Pandas DataFrame. Fugue handles the data type conversions for you.

WilliamCVan commented 2 years ago

@goodwanghan the to_spark_schema works good, but the assert statement is throwing an error saying related to PyArrow.

# array nested data structure
structureData = [
    ([("James","","Smith")],"36636","M",3100),
    ([("Michael","Rose","")],"40288","M",4300),
    ([("Robert","","Williams")],"42114","M",1400),
    ([("Maria","Anne","Jones")],"39192","F",5500),
    ([("Jen","Mary","Brown")],"","F",-1)
  ]

# nested name fields
structureSchema = StructType([
        StructField('name', ArrayType(StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ]))),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df_spark = spark.createDataFrame(data=structureData,schema=structureSchema)
df_spark.printSchema()

assert to_schema(sdf) == "name:[{firstname:str,middlename:str,lastname:str}],id:str,gender:str,salary:int"
  File "C:\Users\Admin\AppData\Local\pypoetry\Cache\virtualenvs\apachessh-c07ysUvF-py3.8\lib\site-packages\fugue_spark\_utils\convert.py", line 34, in to_schema
    return to_schema(obj.schema)
  File "C:\Users\Admin\AppData\Local\pypoetry\Cache\virtualenvs\apachessh-c07ysUvF-py3.8\lib\site-packages\fugue_spark\_utils\convert.py", line 32, in to_schema
    return Schema(_to_arrow_schema(obj))
  File "C:\Users\Admin\AppData\Local\pypoetry\Cache\virtualenvs\apachessh-c07ysUvF-py3.8\lib\site-packages\fugue_spark\_utils\convert.py", line 131, in _to_arrow_schema
    fields = [
  File "C:\Users\Admin\AppData\Local\pypoetry\Cache\virtualenvs\apachessh-c07ysUvF-py3.8\lib\site-packages\fugue_spark\_utils\convert.py", line 133, in <listcomp>
    pa.field(field.name, _to_arrow_type(field.dataType), nullable=True)
  File "C:\Users\Admin\AppData\Local\pypoetry\Cache\virtualenvs\apachessh-c07ysUvF-py3.8\lib\site-packages\fugue_spark\_utils\convert.py", line 127, in _to_arrow_type
    return to_arrow_type(dt)
  File "C:\spark-3.2.1-bin-hadoop3.2\python\pyspark\sql\pandas\types.py", line 60, in to_arrow_type
    raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
TypeError: Unsupported type in conversion to Arrow: ArrayType(StructType(List(StructField(firstname,StringType,true),StructField(middlename,StringType,true),StructField(lastname,StringType,true))),true)

Question: How would I specify that nullable=False in the schema string?

@cosmicBboy I tried your code on a StructType and did validation in Fugue which works well. The only issue is when I convert the schema and data to match the above example @goodwanghan using ArrayType(StructType()). I then get the same error from PyArrow

# BELOW WORKS WELL
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df_spark = spark.createDataFrame(data=structureData,schema=structureSchema)
df_spark.printSchema()

class Name(NamedTuple):
    first_name: str
    middle_name: str
    last_name: str

def name_check(name_components):
    try:
        # return True if element "name" is a valid `Name` tuple
        Name(*name_components)
        return True
    except:
        return False

base_schema = pa.DataFrameSchema({
    "name": pa.Column(object, pa.Check(name_check, element_wise=True)),
    "id": pa.Column(str),
    "gender": pa.Column(str),
    "salary": pa.Column(pa.Int32),
})

# schema: *
def df_validation(df: pd.DataFrame) -> pd.DataFrame:
    validated_df = base_schema(df)
    return validated_df

with FugueWorkflow(SparkExecutionEngine(spark_session=spark)) as dag:
    print("validation #2")
    df_2 = dag.df(df_spark).transform(df_validation)
    df_2.show(3)
_0 _State.RUNNING -> _State.FAILED  Unsupported type in conversion to Arrow: ArrayType(StructType(List(StructField(firstname,StringType,true),StructField(middlename,StringType,true),StructField(lastname,StringType,true))),true)
Traceback (most recent call last):
  File "C:\Users\Admin\AppData\Local\pypoetry\Cache\virtualenvs\apachessh-c07ysUvF-py3.8\lib\site-packages\fugue\workflow\workflow.py", line 1518, in run
    raise ex.with_traceback(ctb)
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2021.3.2\plugins\python-ce\helpers\pydev\pydevd.py", line 1483, in _exec
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2021.3.2\plugins\python-ce\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "C:/1_repos/ApacheSSH/Data_Validation/pyspark_schema_nested_1B.py", line 72, in <module>
    df_2 = dag.df(df_spark).transform(df_validation)
  File "C:\spark-3.2.1-bin-hadoop3.2\python\pyspark\sql\pandas\types.py", line 60, in to_arrow_type
    raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
TypeError: Unsupported type in conversion to Arrow: ArrayType(StructType(List(StructField(firstname,StringType,true),StructField(middlename,StringType,true),StructField(lastname,StringType,true))),true)
WilliamCVan commented 2 years ago

Package versions I'm running

[[package]]
name = "pyarrow"
version = "7.0.0"
description = "Python library for Apache Arrow"
category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "pyspark"
version = "3.2.1"
description = "Apache Spark Python API"
category = "main"
optional = false
python-versions = ">=3.6"
[[package]]
name = "pandera"
version = "0.10.1"
description = "A light-weight and flexible data validation and testing tool for dataframes."
category = "main"
optional = false
python-versions = ">=3.8"
[[package]]
name = "fugue"
version = "0.6.5"
description = "An abstraction layer for distributed computation"
category = "dev"
optional = false
python-versions = ">=3.6"
goodwanghan commented 2 years ago

Ah, sorry, I think the root cause of the error is that native Spark doesn't really do well to handle nested schema. And Fugue added logic to make it work smoothly, but this logic is in 0.6.6.dev2+.

We are going to release 0.6.6 very soon. For now you can use 0.6.6.dev3 to quickly unblock yourself.

WilliamCVan commented 2 years ago

@goodwanghan I see thanks for the heads up. The string schema you came up with, is there a way to specify nullable=False on the individual fields?

"name:[{firstname:str,middlename:str,lastname:str}],id:str,gender:str,salary:int"

I couldn't find an example snippet when I checked the documentation: https://fugue-tutorials.readthedocs.io/tutorials/advanced/schema_dataframes.html

goodwanghan commented 2 years ago

@cosmicBboy @WilliamCVan the code Niels provided is not right for this particular case, I modified it to make it work as expected:

from typing import NamedTuple

import pandera as pa
from pandera import Column, DataFrameSchema

class Name(NamedTuple):
    first_name: str
    middle_name: str
    last_name: str

def name_check(elements):
    try:
        # return True if element "name" is a valid `Name` tuple
        for name_components in elements:
            Name(*name_components)
        return True
    except:
        return False

base_schema = pa.DataFrameSchema({
    "name": pa.Column(object, pa.Check(name_check, element_wise=True)),
    "id": pa.Column(str),
    "gender": pa.Column(str),
    "salary": pa.Column(pa.dtypes.Int32),
})

@WilliamCVan you can have this very simple wrapper:

from typing import Any
from pandera import DataFrameSchema
from fugue import out_transform
from pyspark.sql import SparkSession, DataFrame as SparkDataFrame

def pandera_validate(df:Any, pa_schema:DataFrameSchema, partition:Any=None) -> Any:
    engine, engine_conf = None, None
    if isinstance(df, SparkDataFrame):
        engine = SparkSession.builder.getOrCreate()
        engine_conf = {"fugue.spark.use_pandas_udf":True} # much faster with this config
    out_transform(df, pa_schema.validate, partition=partition, engine=engine, engine_conf=engine_conf)
    return df

So that you can validate spark/pandas dataframe using pandera elegantly:

# using spark to validate distributedly because the input is a Spark df
pandera_validate(spark_df, base_schema) 

# validate locally
pandera_validate(spark_df.toPandas(), base_schema)
pandera_validate(pandas_df, base_schema)

And you can also leverage the partitioning mechanism of Fugue to do per partition validation

pandera_validate(spark_df, base_schema, partition={"by":["key1","key2"]})

Regarding non-nullable fields, currently Fugue assumes everything in a schema is nullable. I was thinking about supporting non-nullable fields, but have not prioritized that. The syntax should look like this if we support

"name:[{firstname:str!,middlename:str!,lastname:str}],id:str,gender:str,salary:int"

With ! firstname and middlename must not be null.

WilliamCVan commented 2 years ago

@goodwanghan thanks for the help and code snippets, am able to validate the nested spark dataframe like I wanted. Works good after I upgraded fugue to the 0.6.6.dev3 version