dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.12k stars 1.39k forks source link

Pandera output type should alow dask.DataFrame & pyspark.sql.DataFrame #21017

Open sam-goodwin opened 5 months ago

sam-goodwin commented 5 months ago

What's the use case?

I am using pandera schemas to explicitly model the outputs of my asset tables but the pandera DagsterType enforces the data frame is a pandas data frame. My dataframe is a partitioned dask data frame. It may also be a spark Data Frame.

Ideas of implementation

Widen

Update typing_type=pd.DataFrame | dd.DataFrame to allow dask data frame:

https://github.com/dagster-io/dagster/blob/cc42532a67707cafef107fe44c56dd472775943c/python_modules/libraries/dagster-pandera/dagster_pandera/__init__.py#L104-L112

Update _pandera_schema_to_type_check_fn to ensure it's compatible with dask data frames.

https://github.com/dagster-io/dagster/blob/cc42532a67707cafef107fe44c56dd472775943c/python_modules/libraries/dagster-pandera/dagster_pandera/__init__.py#L132

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

sam-goodwin commented 5 months ago

This also needs to be updated:

https://github.com/dagster-io/dagster/blob/cc42532a67707cafef107fe44c56dd472775943c/python_modules/libraries/dagster-pandera/dagster_pandera/__init__.py#L46

sam-goodwin commented 5 months ago

Here's a monkey patch that seems to work:

import dagster_pandera
import dask.dataframe as dd
import pandas as pd
import pandera as pa
from dagster import DagsterType, MetadataValue
from dagster_pandera import (
    _extract_name_from_pandera_schema,
    _pandera_schema_to_table_schema,
    _pandera_schema_to_type_check_fn,
)

dagster_pandera.VALID_DATAFRAME_CLASSES = (pd.DataFrame, dd.DataFrame)

# works around dagster_pandera's lack of support for dask https://github.com/dagster-io/dagster/issues/21017
def custom_pandera_schema_to_dagster_type(
    schema: pa.DataFrameSchema | type[pa.SchemaModel] | type[pa.DataFrameModel],
) -> DagsterType:
    name = _extract_name_from_pandera_schema(schema)
    norm_schema = schema.to_schema() if isinstance(schema, type) and issubclass(schema, pa.SchemaModel) else schema
    tschema = _pandera_schema_to_table_schema(norm_schema)
    type_check_fn = _pandera_schema_to_type_check_fn(norm_schema, tschema)

    return DagsterType(
        type_check_fn=type_check_fn,
        name=name,
        description=norm_schema.description,
        metadata={
            "schema": MetadataValue.table_schema(tschema),
        },
        typing_type=pd.DataFrame | dd.DataFrame,
    )

dagster_pandera.pandera_schema_to_dagster_type = custom_pandera_schema_to_dagster_type
sam-goodwin commented 5 months ago

My use-case also required the original pandera schema to remain on the DagsterType for use within the IO Manager. I use it to generate a pyarrow schema. So, I also added a custom class to tunnel that information through to the IO manager.

# needed to piggyback the pandera_schema through to our IO manager
# the pandera_schema is needed because we use it to convert to PyArrow
class PanderaDagsterType(DagsterType):
    def __init__(
        self,
        pandera_schema: Schema,
        type_check_fn: TypeCheckFn,
        name: str,
        description: str | None,
        metadata: Mapping[str, RawMetadataValue] | None,
        typing_type: Any = None,
    ):
        super().__init__(
            type_check_fn=type_check_fn,
            name=name,
            description=description,
            metadata=metadata,
            typing_type=typing_type,
        )
        self.pandera_schema = pandera_schema