kaiko-ai / typedspark

Column-wise type annotations for pyspark DataFrames
Apache License 2.0
65 stars 4 forks source link

Ability to add custom functions to the schema class #448

Closed ram-seek closed 2 months ago

ram-seek commented 3 months ago

Hi we are trying to add custom metadata to columns and use that in some functions.

one workaround we currently have is the one below.where we create a custom dataclass which has the additional attributes we need and then using that to annotate the columns in the schema definition and then use a metaclass to add the custom function.


@dataclass
class PartitionedColumnMeta(ColumnMeta):
    is_partition_column: Optional[bool] = None

class PartitionedSchema(MetaSchema):
    def get_partitions(self) -> list[str | Any]:
        return [
            column_name
            # this returns the column name and the column type
            for column_name, column_type in get_type_hints(
                self, include_extras=True
            ).items()
            if get_origin(column_type) is Annotated
            # for some reason the metadata returns a tuple instead of the actual class.
            and column_type.__metadata__[0].is_partition_column
        ]

class PartitionedSubSchema(metaclass=PartitionedSchema):
    """Schema for the lineage table."""
    description: Annotated[
        Column[T.StringType],
        PartitionedColumnMeta(
            comment="the description of the action",
            is_partition_column=True,
        ),
    ]
    app: Annotated[
        Column[T.StringType],
        PartitionedColumnMeta(
            comment="the name of the app",
            is_partition_column=True,
        ),
    ]
    appVersion: Annotated[
        Column[T.StringType],
        PartitionedColumnMeta(
            comment="the app version ",
            is_partition_column=True,
        ),
    ]

Instead of that ,I thought it would be straightforward to add the custom functions to the schema during the class by modifying the new method in the metaschema as below

    def __new__(cls, name: str, bases: Any, dct: Dict[str, Any]):
        cls._attributes = dir(cls)
        # inorder to add custom functions to the schema
        for items in dct.items():
            if type(items[1]) ==staticmethod or type(items[1]) == classmethod or inspect.isfunction(items[1]) :
                cls._attributes.append(items[1].__name__)
nanne-aben commented 2 months ago

Hi @ram-seek !

I actually quite like the approach you took with subclassing the ColumnMeta and Schema classes to add custom function. I think it's an elegant solution. It also allows for auto-complete on those custom functions.

Why would you prefer to adapt the __new__() function instead?

Another route we could explore... The metadata you add in ColumnMeta can be accessed through

{field.name: field.metadata for field in PartitionedSchema.get_structtype().fields}

Right now, it only supports getting the comment field (that's defined in the original ColumnMeta), but I could easily expand it such that it also picks up fields that are added in subclasses. Then we could have something like:

@dataclass
class PartitionedColumnMeta(ColumnMeta):
    is_partition_column: Optional[bool] = None

class PartitionedSchema(Schema):
    """Schema for the lineage table."""

    description: Annotated[
        Column[T.StringType],
        PartitionedColumnMeta(
            comment="the description of the action",
            is_partition_column=True,
        ),
    ]
    app: Annotated[
        Column[T.StringType],
        PartitionedColumnMeta(
            comment="the name of the app",
            is_partition_column=True,
        ),
    ]
    appVersion: Annotated[
        Column[T.StringType],
        PartitionedColumnMeta(
            comment="the app version ",
            is_partition_column=True,
        ),
    ]

{field.name: field.metadata for field in PartitionedSchema.get_structtype().fields}

Which would return

{'description': {'comment': 'the description of the action',
  'is_partition_column': True},
 'app': {'comment': 'the name of the app', 'is_partition_column': True},
 'appVersion': {'comment': 'the app version ', 'is_partition_column': True}}

We could further improve that by creating a class method in Schema such as Schema.get_metadata(), such that:

PartitionedSchema.get_metadata()
{'description': {'comment': 'the description of the action',
  'is_partition_column': True},
 'app': {'comment': 'the name of the app', 'is_partition_column': True},
 'appVersion': {'comment': 'the app version ', 'is_partition_column': True}}

You'd still have to write get_partitions() (potentially outside of the schema?), but the above might abstract some things away.

What do you think?

ram-seek commented 2 months ago

@nanne-aben ,thanks a lot taking a look at this. That sounds really good but still I would like the convenience of having PartitionedSchema.get_partitions() rather than the methods sitting outside of the class but this is a great start

Thinking a bit more about the potential issues. will the names of the methods attached clash with the internal methods of Typedspark ?

nanne-aben commented 2 months ago

I've implemented Schema.get_metadata() here.

To define PartitionedSchema.get_partitions(), I think your suggestions of subclassing the metaclass actually works quite well. So after the above PR is merged, that would be:

from dataclasses import dataclass
from typing import Annotated, Optional

from typedspark import Column, ColumnMeta, MetaSchema

import pyspark.sql.types as T

@dataclass
class PartitionedColumnMeta(ColumnMeta):
    is_partition_column: Optional[bool] = None

class PartitionedMetaSchema(MetaSchema):
    def get_partitions(cls) -> list[str]:
        res = []
        for k for k, _ in cls.get_metadata().items():
             if "is_partition_column" in v and is_partition_column["is_partition_column"]:
                 res.append(k)

    return res

class PartitionedSubSchema(metaclass=PartitionedMetaSchema):
    """Schema for the lineage table."""

    description: Annotated[
        Column[T.StringType],
        PartitionedColumnMeta(
            comment="the description of the action",
            is_partition_column=True,
        ),
    ]
    app: Annotated[
        Column[T.StringType],
        PartitionedColumnMeta(
            comment="the name of the app",
            is_partition_column=True,
        ),
    ]
    appVersion: Annotated[
        Column[T.StringType],
        PartitionedColumnMeta(
            comment="the app version ",
            is_partition_column=True,
        ),
    ]

PartitionedSubSchema.get_partitions()
nanne-aben commented 2 months ago

Latest version typedspark (1.5.0) should have the above functionality now! :)

ram-seek commented 2 months ago

Thanks a lot. Yup that works for me.