canimus / cuallee

Possibly the fastest DataFrame-agnostic quality check library in town.
https://canimus.github.io/cuallee/
Apache License 2.0
158 stars 20 forks source link

No support for `pyspark.sql.connect.dataframe.DataFrame` in custom functions #314

Closed marrov closed 2 weeks ago

marrov commented 2 weeks ago

Describe the bug I have been using cuallee's pyspark API for some time, but I came across an issue that is, admittedly not cuallee's fault but limits applicability in my usecase.

The gist of it is that if you are running databricks-connect to access data in a Databricks environment through the pyspark API, the return type of pyspark DataFrames is not the same as the when using regular spark:

This causes the following error when a custom function is run:

    def _execute(dataframe: DataFrame, key: str):
        try:
            assert isinstance(
                rule.value, Callable
            ), "Please provide a Callable/Function for validation"
            computed_frame = rule.value(dataframe)
            assert isinstance(
                computed_frame, DataFrame
            ), "Custom function does not return a PySpark DataFrame"
            assert (
                len(computed_frame.columns) >= 1
            ), "Custom function should retun at least one column"
            computed_column = last(computed_frame.columns)
            return computed_frame.select(
                F.sum(F.col(f"`{computed_column}`").cast("integer")).alias(key)
            )

        except Exception as err:
>           raise CustomComputeException(str(err))
E           cuallee.CustomComputeException: Custom function does not return a PySpark DataFrame

Note that this is actually not an issue with built-in checks, I have not checked why. A simple solution to this would be to check for the tuple of types:

from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame

    def _execute(dataframe: DataFrame, key: str):
        try:
            assert isinstance(
                rule.value, Callable
            ), "Please provide a Callable/Function for validation"
            computed_frame = rule.value(dataframe)
            assert isinstance(
                computed_frame, (DataFrame, ConnectDataFrame)
            ), "Custom function does not return a PySpark DataFrame"
            assert (
                len(computed_frame.columns) >= 1
            ), "Custom function should retun at least one column"
            computed_column = last(computed_frame.columns)
            return computed_frame.select(
                F.sum(F.col(f"`{computed_column}`").cast("integer")).alias(key)
            )

I have made a fork of the main branch and added this change (here)[https://github.com/marrov/cuallee/tree/bug/connect-dataframe] as I really need this feature but I will not start a PR until it is clear that this is a functionality that the devs are ok with supporting.

To Reproduce Steps to reproduce the behavior:

  1. Load a pyspark dataframe using databricks connect
  2. Validate it against a custom function

Expected behavior The validation should not fail for a pyspark.sql.connect.dataframe.DataFrame

Desktop:

canimus commented 2 weeks ago

Thanks @marrov for finding this shortcoming, and also for your engagement with this project. Appreciate it. A tuple with both dataframe instances feels like a good solution. To simplify the validation and harness the return function, I could think of an alternative approach, that uses str instead of the class. Then the comparison would not require the import of the ConnectDataFrame and instead simply operate with type , str and re potentially. Because we are working in the new cuallee v1.0 bringing refactor, less code, increased score for maintenance, fresh docs, videos, and machine learning enabled checks...., anyway an example of what I mentioned above... you can find in a working branch feature-dtype that removes all the type inferences in the main __init__.py file.

As an example, the compute engines are now substantially less complex to understand via:

self.dtype = first(re.match(r".*'(.*)'", str(type(dataframe))).groups())
match self.dtype:
    case self.dtype if "pyspark" in self.dtype:
        self.compute_engine = importlib.import_module("cuallee.pyspark_validation")
    case self.dtype if "pandas" in self.dtype:
        self.compute_engine = importlib.import_module("cuallee.pandas_validation")
    case self.dtype if "snowpark" in self.dtype:
        self.compute_engine = importlib.import_module("cuallee.snowpark_validation")
    case self.dtype if "polars" in self.dtype:
        self.compute_engine = importlib.import_module("cuallee.polars_validation")
    case self.dtype if "duckdb" in self.dtype:
        self.compute_engine = importlib.import_module("cuallee.duckdb_validation")
    case self.dtype if "bigquery" in self.dtype:
        self.compute_engine = importlib.import_module("cuallee.bigquery_validation")
    case self.dtype if "daft" in self.dtype:
        self.compute_engine = importlib.import_module("cuallee.daft_validation")
    case _:
        raise NotImplementedError(f"{self.dtype} is not yet implemented in cuallee")

what do you think?

canimus commented 2 weeks ago

Fixed in #315

marrov commented 2 weeks ago

Thank you for an amazing data quality library! It is amazing that you fixed this so quick, I'll remove my fork and close this issue. I actually did not think of solving isinstance checks with str and re but that seems like a great lightweight approach!