unionai-oss / pandera

A light-weight, flexible, and expressive statistical data testing library
https://www.union.ai/pandera
MIT License
3.36k stars 309 forks source link

lazy validation on dask dataframes should collect all errors #719

Open cosmicBboy opened 2 years ago

cosmicBboy commented 2 years ago

Describe the bug

When doing lazy validation on dask dataframes, currently only the first partition to raise a SchemaErrors exception is reported.

Code Sample, a copy-pastable example

import pandera as pa
import dask.dataframe as dd
import pandas as pd

int_schema = pa.DataFrameSchema({"col": pa.Column(int, checks=pa.Check.gt(0))})
str_schema = pa.DataFrameSchema({"col": pa.Column(str)})

df = pd.DataFrame({"col": [-x for x in range(10)]})
ddf = dd.from_pandas(df, npartitions=5)

ddf = int_schema.validate(ddf, lazy=True)
val_df = ddf.compute()
print(val_df)

stdout:

pandera.errors.SchemaErrors: A total of 1 schema errors were found.

Error Counts
------------
- schema_component_check: 1

Schema Error Summary
--------------------
                                      failure_cases  n_failure_cases
schema_context column check
Column         col    greater_than(0)      [-8, -9]                2

Expected behavior

The semantics for lazy=True on dask dataframes should be such that all errors are collected across partitions, such that all failure cases are reported in the SchemaErrors object.

The solution should implement a dask validation function passed into map_partitions here: https://github.com/pandera-dev/pandera/blob/master/pandera/schemas.py#L483-L501

such that the schema errors across all partitions are aggregated into a single SchemaErrors exception.

Naveen-Annam-CMA commented 1 year ago

Hi. Great work on the package! We have run in to a similar issue recently.

Currently, we do something similar to below as a workaround

def validate_partition(schema, partition):
    """
    Runs validation on a dask partition

    Args:
        partition (dd.DataFrame) partition for dask dataframe
    """

    report = pd.DataFrame()
    try:
        schema.validate(partition, lazy=True)
    except pa.errors.SchemaErrors as err:
        report = err.failure_cases
    return report

ddf = dd.from_pandas(df, npartitions=num_cores)
validated_ddf = ddf.map_partitions(validate_partition)

report = validated_ddf.compute()

There are some issues with this approach when it relates to dataframe schema validation that runs checks across two columns or unique checks for a column due to partitions.

Are there potential workarounds that you are aware of or can suggest beyond what we have above?

cosmicBboy commented 1 year ago

Hi @Naveen-Annam-CMA, so if I understand correctly, validated_ddf is actually the failure cases collected across all partitions, not the validated data, correct?

Naveen-Annam-CMA commented 1 year ago

Hi @Naveen-Annam-CMA, so if I understand correctly, validated_ddf is actually the failure cases collected across all partitions, not the validated data, correct?

Yes, thats correct as our focus is on collecting all the errors whilst utilizing the power of dask. But there is problem with this approach as mentioned previously. The independent validation means you end up with duplicate errors, incomplete validation for dataframeschema validation and unique checks are not reflective of the whole column rather just the partition. We had to separate out the checks which can be fully independent. i.e row level checks