unionai-oss / pandera

A light-weight, flexible, and expressive statistical data testing library
https://www.union.ai/pandera
MIT License
3.22k stars 300 forks source link

check-type decorators do not work in parallel #1411

Open dcnadler opened 9 months ago

dcnadler commented 9 months ago

When a function is wrapped by a pandera typing-checking decorator, it can't be used in parallel execution without error or silently failing.

I believe this is because of a known issue with wrapt-based decorators.

We would like to use the check_io decorator in our existing project, but can't with this bug since we use most of our functions in parallel.

The issue on wrapt is over three years old, so may be worth it to rewrite the decorators without wrapt?

Code Sample, a copy-pastable example

import pandas as pd
import pandera as pa
from pandarallel import pandarallel
from pandera.typing import DataFrame

pandarallel.initialize()

class InputSchema(pa.DataFrameModel):
    A: int
    B: int

class OutSchema(pa.DataFrameModel):
    A: str
    B: str

@pa.check_types()
def fraction(df: DataFrame[InputSchema]) -> DataFrame[OutSchema]:
    df = df.drop(columns="C")
    return df / df.sum()

@pa.check_io(df=InputSchema.to_schema(), out=OutSchema.to_schema())
def percent(df: pd.DataFrame) -> pd.DataFrame:
    df = df.drop(columns="C")
    return (df / df.sum()) * 100

df = pd.DataFrame(
    {
        "A": [1, 2, 3],
        "B": [4, 6, 5],
        "C": ["a", "a", "b"],
    }
)

out1 = df.groupby("C", axis=0, group_keys=False).parallel_apply(fraction)
# NotImplementedError: object proxy must define __reduce_ex__()
out2 = df.groupby("C", axis=0, group_keys=False).parallel_apply(percent)
# NotImplementedError: object proxy must define __reduce_ex__()

When using joblib to run the function in parallel, a similar behavior is seen as with the issue in wrapt with a slight difference. In that issue, running in multi-process mode gave a similar error to pandarallel above (problem with serializing) but in my test it doesn't throw an error, it just seems to ignore the decorator (fails silently)

from joblib import Parallel, delayed

groups = df.groupby("C", axis=0, group_keys=False)
with Parallel(n_jobs=2, prefer="threads") as parallel:
    result = parallel(delayed(fraction)(grp) for idx, grp in groups)
    # Works correctly, throws a SchemaError
out3 = pd.concat(result)
groups = df.groupby("C", axis=0, group_keys=False)
with Parallel(n_jobs=2, prefer="processes") as parallel:
    result = parallel(delayed(fraction)(grp) for idx, grp in groups)
out4 = pd.concat(result)
# This does not seem to check the output type and throws no error

Expected behavior

In the example with pandarallel , both groupby-apply operations should throw SchemaErrors when type-checking the output dataframe (this can be achieved by replacing .parallel_apply with .apply). Expected output: SchemaError: error in check_types decorator of function 'fraction': expected series 'A' to have type str

For joblib, whether using multi-thread or multi-process mode, it should throw the same SchemaError

Desktop (please complete the following information):

cosmicBboy commented 9 months ago

Hi @dcnadler, so there have been other issues with the @wrapt dependency in the past. I don't have too much context now on what can be done with wrapt to fix this.

Perhaps a longer term solution would be to refactor these decorators so that they work with plain python or functools.wraps (I used wrapt early on in pandera's development mainly because I found it convenient to use).

I'd support a PR to make this change (with updated tests) if you're open to making one!

limx0 commented 6 months ago

Just adding a +1 here, we also hit this using pandera with dask/distributed if anyone else finds themselves to this comment.