unionai-oss / pandera

A light-weight, flexible, and expressive statistical data testing library
https://www.union.ai/pandera
MIT License
3.27k stars 305 forks source link

Re-implement lazy-executable Pandas API for Pandera #451

Closed Hoeze closed 2 years ago

Hoeze commented 3 years ago

Usually, I write code like this:

df1 = pd.read_csv(...)
df2 = pd.read_csv(...)
df = df1.join(df2)
df.groupby(["a", "b"]).agg({
    "c": ["min", "max"],
    "d": ["mean"],
})

and then I need a large amount of time to annotate the column types.

It would save me an indefinite amount of time if there would be an automatic schema inference like e.g. in PySpark.

Describe the solution you'd like Like already done with DataFrameSchema.set_index() / reset_intex(), we could mirror the Pandas API for type inference:

class DataFrameWrapper:
    def __init__(self, df):
        self.df = df

    def join(df: pd.DataFrame) -> pa.DataFrameWrapper
        [...]

df = DataFrameWrapper(pd_df1)

df.join(pd_df2).schema() -> pa.DataFrameSchema
df.join(pd_df2).compute() -> pd.DataFrame

Basically the same thing that dask.dataframe does, but with full support of all Pandas API.

Additionally, we could have decorators that wrap non-Pandera methods:

# schema decorator
def min_schema(callable):
    def min_fn(input):
        if isinstance(input, pa.SeriesSchema):
            # same return type
            return input
        else:
            return callable(input)

@min_schema
def min_fn(input: pd.Series) -> pd.Series:
    return input.min()

Describe alternatives you've considered I tried to implement classes for every method like this:

class DataFrameTransformer(meta=abc.ABCMeta):
    @abc.abstractmethod
    def schema(self, *args, **kwargs) -> pdt.DataFrameSchema:
        raise NotImplementedError

    def transform(self, *args, **kwargs) -> pd.DataFrame:
        raise NotImplementedError

class SeriesTransformer(meta=abc.ABCMeta):
    @abc.abstractmethod
    def schema(self, *args, **kwargs) -> pdt.SeriesSchema:
        raise NotImplementedError

    @abc.abstractmethod
    def transform(self, *args, **kwargs) -> pd.Series:
        raise NotImplementedError

class AggTransformer(SeriesTransformer, meta=abc.ABCMeta):
    """
    Base class of aggregation method.
    Provides an `agg_fn` property that can either return the name of a
    Pandas aggregation function (e.g. "min", "max", ...) or a callable
    aggregation function.
    """

    @property
    def agg_fn(self) -> Union[str, Callable]:
        """
        Provides an `agg_fn` property that can either return the name of a
        Pandas aggregation function (e.g. "min", "max", ...) or a callable
        aggregation function.
        """
        return self.transform

class MinTransformer(AggTransformer):
    def schema(self, input: pdt.SeriesSchema) -> pdt.SeriesSchema:
        return input

    def transform(self, input: pd.Series) -> pd.Series:
        return input.min()

    def agg_fn(self) -> str:
        return "min"

This does what I need, but I end up re-writing all Pandas with some awkward transformer style

jeffzi commented 3 years ago

Hi @Hoeze. If I rephrase your explanations, you want a schema whose data types would be continiously synced with a DataFrame.

Given the use of compute() in your example, I guess you mean "lazy" in the sense of Spark or Dask, where transformations do not compute their results right away. I'm not sure how it is related to the problem at hand. Could you clarify?

and then I need a large amount of time to annotate the column types.

Could you detail your motivations for having an always up-to-date schema? A concrete example would also help understanding your intentions.

A pandera schema is not intended to reflect the current state of a DataFrame but rather to define what the DataFrame is expected to be like at given checkpoints (e.g input/ouput of a transformation). If you only need continuous type inference, you can access column types via the .dtypes attribute on pandas.Series or pandas.DataFrame, similarly to PySpark and other dataframe-like libraries.

A reasonable alternative is to decompose your code into meaningful transformations that you validate via the pandera decorators. That said, I do agree it's painful to maintain variants of a schema. Here are some ways to alleviate the pain:

Finally, a pandera.DataFrameSchema is automatically added to a DataFrame after calling DataFrameSchema.validate(). You can also directly add a schema via the pandas extension: df.pandera.add_schema.

import pandas as pd
import pandera as pa 

df = pd.DataFrame(data=[[1, 2, 3]], columns=["a", "abc", "b"])
schema = pa.DataFrameSchema({"a": pa.Column(int, regex=True, required=False)})
df = df.pandera.add_schema(schema)
df.pandera.schema # access the schema

However, the schema will not be continuously inferred and could be out-of-sync after manipulating the DataFrame.

I'd like to explore creative uses of type annotations to create in-line schemas but did not push the idea further. An in-line column addition would be DataFrame[MySchema, typing.Annotated(Series[int], "newcol")] where "newcol" is the name of the new column. That style is an eyesore though...

cosmicBboy commented 3 years ago

Thanks for the post @Hoeze! In addition to echoing @jeffzi's questions re: the intent/use case behind the lazy compute() method, I do want to emphasize the point he made:

A pandera schema is not intended to reflect the current state of a DataFrame but rather to define what the DataFrame is expected to be like at given checkpoints

I.e. pandera is meant to be an explicit specification for a dataframe's contents, types, and statistical properties, both for human understandability of data pipelines and enforcement at runtime. So any schema inference functionality that looks like this:

import pandas as pd
import pandera as pa

df = pd.DataFrame({"a": [1,2,3]})
df2 = pd.DataFrame({"b": [4,5,6]})
schema = df.join(df2).pipe(pa.infer_schema)

Is purely meant to bootstrap a schema definition to be written out in some format (yaml schema or python code) for further editing by a human, and then used in the code either directly (schema(df)) or in a decorator. This eases the tedium of hand-writing the schema from scratch.

print(schema.to_script())

Output:

from pandera import (
    DataFrameSchema,
    Column,
    Check,
    Index,
    MultiIndex,
    PandasDtype,
)

schema = DataFrameSchema(
    columns={
        "a": Column(
            pandas_dtype=PandasDtype.Int64,
            checks=[
                Check.greater_than_or_equal_to(min_value=1.0),
                Check.less_than_or_equal_to(max_value=3.0),
            ],
            nullable=False,
            allow_duplicates=True,
            coerce=False,
            required=True,
            regex=False,
        ),
        "b": Column(
            pandas_dtype=PandasDtype.Int64,
            checks=[
                Check.greater_than_or_equal_to(min_value=4.0),
                Check.less_than_or_equal_to(max_value=6.0),
            ],
            nullable=False,
            allow_duplicates=True,
            coerce=False,
            required=True,
            regex=False,
        ),
    },
    index=Index(
        pandas_dtype=PandasDtype.Int64,
        checks=[
            Check.greater_than_or_equal_to(min_value=0.0),
            Check.less_than_or_equal_to(max_value=2.0),
        ],
        nullable=False,
        coerce=False,
        name=None,
    ),
    coerce=True,
    strict=False,
    name=None,
)

That said, I think there's room for expressing additional schema transformations as you suggest with join, although I did want to point out that these operations would be on schemas and not on dataframes

Hoeze commented 3 years ago

Thanks for your input @jeffzi and @cosmicBboy. Let me try to clarify a bit my issue: Currently, I write a complex data pipeline where the output schema depends on some variables. Therefore, in order to know the output schema before actually running the whole pipeline, I manually write for every transformation an additional schema() function. This schema() function takes exactly the same arguments than the transformation method. However, instead of executing the transaction, it only returns a Pandera schema.

Now I was thinking on how to simplify this process. One option would be to use dask.dataframe.from_pandas() but:

Another possibility is to pass an empty Pandas dataframe into the transformation method and use its output as a schema. This is actually the most easily achievable one and I'm currently trying to use this method where possible.

The most clean solution IMHO would be to have a DataFrame wrapper that only records the schema changes but instead does not execute anything. This is similar to how PySpark works.

cosmicBboy commented 3 years ago

Therefore, in order to know the output schema before actually running the whole pipeline, I manually write for every transformation an additional schema() function.

When you obtain the output schema, what do you use it for? Is it fed back into the pipeline to validate the data? Do you do this when you're developing a pipeline and want to bootstrap a schema to write out to its own python module to use in the pipeline itself? I'm still a little unclear what the context is and problem this solution is trying to solve, I think a toy code example of what you're trying to achieve as a whole would help me understand

Hoeze commented 3 years ago

When you obtain the output schema, what do you use it for? Is it fed back into the pipeline to validate the data? Do you do this when you're developing a pipeline and want to bootstrap a schema to write out to its own python module to use in the pipeline itself? I'm still a little unclear what the context is and problem this solution is trying to solve, I think a toy code example of what you're trying to achieve as a whole would help me understand

When I know the output schema of a previous transformation, I can use it for implementing the next transformation. Also, to solve edge cases in the transformation I need to know the expected output schema, so I can return an empty dataframe from it. Finally, I need to know a fixed schema to prepare e.g. on-disk schemas or setup input dimensions in Keras.

I'm using Pandera to annotate the output schema by hand for now. I'd just wish that there would be an easier, more automatic way of doing this.

Here is a code sample that I would like to simplify using Pandera. This basically aggregates a dataframe with an index ("subtissue", "gene", "feature", "sample_id") to ("subtissue", "gene", "sample_id"):


class VEPGeneLevelVariantAggregator:

    def __init__(self, vep_tl_aggr: VEPTranscriptLevelVariantAggregator, gtex_tp: GTExTranscriptProportions = None):
        self.vep_tl_aggr = vep_tl_aggr
        if gtex_tp is None:
            self.gtex_tp = GTExTranscriptProportions()
        else:
            self.gtex_tp = gtex_tp

    @cached_property
    def schema(self) -> pd.DataFrame:
        """
        Returns an empty dataframe with the expected schema of the output
        """
        tl_schema = self.vep_tl_aggr.schema
        index = [
            "subtissue",
            "gene",
            "sample_id",
        ]
        columns = [
            "subtissue",
            "gene",
            "sample_id",
            *tl_schema.columns
        ]
        dtype = {
            "subtissue": str,
            "gene": str,
            "sample_id": str,
            **tl_schema.dtypes
        }

        retval = pd.DataFrame(columns=columns)
        retval = retval.astype(dtype)
        retval = retval.set_index(index)

        return retval

    def agg_gene_level(self, gene, subtissue) -> pd.DataFrame:
        if isinstance(subtissue, str):
            subtissue = [subtissue]

        transcript_level_batch = self.vep_tl_aggr[gene]
        if transcript_level_batch.empty():
            return self.schema

        max_transcript_df = self.gtex_tp.get_canonical_transcript(gene=gene, subtissue=subtissue)
        max_transcript_df = pd.DataFrame(dict(feature=max_transcript_df)).set_index("feature", append=True)

        gene_level_df = max_transcript_df.join(transcript_level_batch, how="inner")
        gene_level_df = gene_level_df.droplevel("feature")
        gene_level_df = gene_level_df.reorder_levels(["subtissue", "gene", "sample_id"])

        return gene_level_df
cosmicBboy commented 3 years ago

Okay, I think I'm starting to understand. So to re-state your use case, you want to be able to recover the schema of a pd.DataFrame/Series after several transformations (calls to pandas data transformation methods like join, groupby, agg, etc), even before actually executing those transformations on actual data for the purposes of:

When I know the output schema of a previous transformation, I can use it for implementing the next transformation. Also, to solve edge cases in the transformation I need to know the expected output schema, so I can return an empty dataframe from it. Finally, I need to know a fixed schema to prepare e.g. on-disk schemas or setup input dimensions in Keras.

base_schema = pa.DataFrameSchema({
    "a": pa.Column(int),
    "b": pa.Column(pd.CategoricalDtype(["x", "y"])),
})

df = base_schema(
    pd.DataFrame({
        "a": [1, 2, 3, 4, 5, 6],
        "b": list("xxxyyy"),
    })
)

df_transformed = df.groupby("b").agg({"a": ["min", "max"]}) 
# ✨✨some magic under the hood to infer new columns and types ✨✨

df_transformed.pandera.schema == pa.DataFrameSchema({
    # multiindex columns
    ("a", "min"): pa.Column(int),
    ("a", "max"): pa.Column(int),
})

print(df2)
#    a
#   min max
# b
# x   1   3
# y   4   6

I'm guessing this would be a tool during development of your data processing pipeline to be able to write out your transformation code without running a potentially expensive set of computations (?)

My first thoughts on this would be that this would be quite an undertaking covering all of panda's API to fulfill this use case, and I'm not sure yet whether this falls within the scope of pandera as a data testing libary. There would also be many cases in which a schema can't be inferred because new column names would rely on specific values of the data that haven't been enumerated in the schema. I'll need to let these ideas marinate for a little bit, and also feel free to add more explanations/code sketches if I'm off the mark.

Hoeze commented 3 years ago

Thanks @cosmicBboy, you perfectly summarized my issue.

In most case, the Pandas API has a stable return type independent of dataframe contents (e.g. min, max, join, groupby/agg, ...). => Just executing the transformation with an empty dataframe should give the desired schema. For cases where the return type is not known beforehand, we could just drop an exception and tell the user that he needs to provide his own schema.

cosmicBboy commented 2 years ago

closing this issue, I think the functionality described in this issue will require a branch new opensource library for analyzing data transformations 😅