unionai-oss / pandera

A light-weight, flexible, and expressive statistical data testing library
https://www.union.ai/pandera
MIT License
3.37k stars 310 forks source link

Pyspark unique check doesn't return error #1344

Open mdenushev opened 1 year ago

mdenushev commented 1 year ago

Describe the bug Trying to check uniqueness of field, but no errors returned

Note: Please read this guide detailing how to provide the necessary information for us to reproduce your bug.

Code Sample, a copy-pastable example

from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .getOrCreate()
)

import pandera.pyspark as pa
import pyspark.sql.types as T
from pandera.pyspark import DataFrameModel

class Sample(DataFrameModel):
    id: T.StringType() = pa.Field(unique=True)

spark_schema = T.StructType([T.StructField('id', T.StringType(), False)])

from pyspark.sql import Row
data = [
    Row(id="1"),
    Row(id="1"),
    Row(id="2"),
]
sample_df = spark.createDataFrame(data, spark_schema)
res = Sample.to_schema().validate(sample_df)
print(res.pandera.errors)
# Output: {}

Expected behavior

Uniqueness error should be returned. Pandas uniqueness verification works fine.

Desktop (please complete the following information):

cosmicBboy commented 1 year ago

@NeerajMalhotra-QB @jaskaransinghsidana the unique=True core check was never implemented for pyspark right? I don't see it here:

https://github.com/unionai-oss/pandera/blob/main/pandera/backends/pyspark/column.py#L38-L42

Would this be as simple as:

df.select(schema.name).distinct().count() == df.select(schema.name).count()

If not we should probably raise a SchemaInitError to say that it isn't currently supported

NeerajMalhotra-QB commented 1 year ago

Yeah, implementation is simple. User needs to be careful about using it in production though. By default it should be disabled and only runs when absolutely needed.

On Sep 23, 2023, at 8:29 AM, Niels Bantilan @.***> wrote:

 @NeerajMalhotra-QB @jaskaransinghsidana the unique=True core check was never implemented for pyspark right? I don't see it here: https: //github. com/unionai-oss/pandera/blob/main/pandera/backends/pyspark/column. py#L38-L42 Would this be as simple

@NeerajMalhotra-QBhttps://urldefense.com/v3/__https://github.com/NeerajMalhotra-QB__;!!EIXh2HjOrYMV!eFo7oErYrsFrricKFg7wpkZmXo5sRLEWqpP9Y9IUR6qBAee3TYfHy9SQpIA4K5AuqZajCsdzsZETJhFm9W41g1vLLFSZd0M$ @jaskaransinghsidanahttps://urldefense.com/v3/__https://github.com/jaskaransinghsidana__;!!EIXh2HjOrYMV!eFo7oErYrsFrricKFg7wpkZmXo5sRLEWqpP9Y9IUR6qBAee3TYfHy9SQpIA4K5AuqZajCsdzsZETJhFm9W41g1vLqDnVQ3U$ the unique=True core check was never implemented for pyspark right? I don't see it here:

https://github.com/unionai-oss/pandera/blob/main/pandera/backends/pyspark/column.py#L38-L42https://urldefense.com/v3/__https://github.com/unionai-oss/pandera/blob/main/pandera/backends/pyspark/column.py*L38-L42__;Iw!!EIXh2HjOrYMV!eFo7oErYrsFrricKFg7wpkZmXo5sRLEWqpP9Y9IUR6qBAee3TYfHy9SQpIA4K5AuqZajCsdzsZETJhFm9W41g1vLuFeSJC4$

Would this be as simple as:

df.select(schema.name).distinct().count() == df.select(schema.name).count()

— Reply to this email directly, view it on GitHubhttps://urldefense.com/v3/__https://github.com/unionai-oss/pandera/issues/1344*issuecomment-1732342719__;Iw!!EIXh2HjOrYMV!eFo7oErYrsFrricKFg7wpkZmXo5sRLEWqpP9Y9IUR6qBAee3TYfHy9SQpIA4K5AuqZajCsdzsZETJhFm9W41g1vLfeZkiTY$, or unsubscribehttps://urldefense.com/v3/__https://github.com/notifications/unsubscribe-auth/AMOND3TO5NM7TIIXWG7WCGLX3353TANCNFSM6AAAAAA5CDWMQY__;!!EIXh2HjOrYMV!eFo7oErYrsFrricKFg7wpkZmXo5sRLEWqpP9Y9IUR6qBAee3TYfHy9SQpIA4K5AuqZajCsdzsZETJhFm9W41g1vLugjzQ4w$. You are receiving this because you were mentioned.Message ID: @.***>

+=============================================================+ This email is confidential and may be privileged. If you have received it in error, please notify us immediately, delete the email, and do not copy it, disclose its contents or use it for any purpose. +=============================================================+

cosmicBboy commented 1 year ago

By default it should be disabled and only runs when absolutely needed.

Cool, unique=False by default, so opting in with unique=True should make sense

zippeurfou commented 6 months ago

It would be great to at minimum have this in the documentation. It breaks trust with the user to not have documentation or a warning on this. One thing to call out is that you can use Config to do it. Sample code:

import pandera.pyspark as pa
import pyspark.sql.types as T

from decimal import Decimal
from pyspark.sql import SparkSession
from pandera.pyspark import DataFrameModel
from unittest import TestCase

spark = SparkSession.builder.getOrCreate()

class TestPanderaSpark(TestCase):
    def test_unique(self):
        class PanderaSchema(DataFrameModel):
            id: T.IntegerType() = pa.Field(gt=5)
            product_name: T.StringType() = pa.Field(str_startswith="B")
            price: T.DecimalType(20, 5) = pa.Field()
            description: T.ArrayType(T.StringType()) = pa.Field()
            meta: T.MapType(T.StringType(), T.StringType()) = pa.Field()

            class Config:
                """Config of pandera class"""

                unique = "id"

        data = [
            (
                6,
                "Bread",
                Decimal(44.4),
                ["description of product"],
                {"product_category": "dairy"},
            ),
            (
                15,
                "Butter",
                Decimal(99.0),
                ["more details here"],
                {"product_category": "bakery"},
            ),
            (
                15,
                "Buzz",
                Decimal(99.0),
                ["more details here"],
                {"product_category": "bakery"},
            ),
        ]

        spark_schema = T.StructType(
            [
                T.StructField("id", T.IntegerType(), False),
                T.StructField("product", T.StringType(), False),
                T.StructField("price", T.DecimalType(20, 5), False),
                T.StructField("description", T.ArrayType(T.StringType(), False), False),
                T.StructField("meta", T.MapType(T.StringType(), T.StringType(), False), False),
            ],
        )
        df = spark.createDataFrame(data, spark_schema)
        df_out = PanderaSchema.validate(check_obj=df)
        self.assertTrue(len(df_out.pandera.errors['DATA']['DUPLICATES'])==1)
cosmicBboy commented 6 months ago

Happy to review a PR to update documentation and a warning/error if it's specified @zippeurfou .

It breaks trust with the user to not have documentation or a warning on this.

Would it make sense to raise a SchemaInitError here instead? Seems like while this isn't implemented it should fail fast. Can basically raise an error here so that the behavior is clearer.

zippeurfou commented 6 months ago

Thanks @cosmicBboy, yes it makes sense. I will see if I can raise a PR for this.

cosmicBboy commented 6 months ago

pinging this issue again, in case anyone has the capacity to make a PR for it. Basically the PR just needs to implement the solution described here, with unit tests

filipeo2-mck commented 4 months ago

What about ensuring uniqueness values over a composite primary key of a table, for example?

I understand that all three id_* columns below should be taken into account when applying the unique check:

        class PanderaSchema(DataFrameModel):
            id_1: T.IntegerType() = pa.Field(unique=True)  # composite primary key
            id_2: T.IntegerType() = pa.Field(unique=True)  # composite primary key
            id_3: T.IntegerType() = pa.Field(unique=True)  # composite primary key
            product_name: T.StringType() = pa.Field(str_startswith="B")
            price: T.DecimalType(20, 5) = pa.Field()
            description: T.ArrayType(T.StringType()) = pa.Field()
            meta: T.MapType(T.StringType(), T.StringType()) = pa.Field()

The test couldn't be done at column level only, but at DataFrameModel/Schema-level instead.

The logic of the unique could be something like this:

df_non_unique = df.groupBy(
    [column for column in df if column.<some_internal_unique_attribute>]
).agg{
    f.count().alias("count")
).filter("count" > 1)

if df_non_unique.count() > 0:
    raise Exception("The fields with unique check are not jointly unique")

WDYT, @NeerajMalhotra-QB, @zippeurfou?

Edit: The above behavior can be achieved with the Config class and it behaves correctly:

class PanderaSchema(DataFrameModel):
    id: T.IntegerType() = pa.Field()
    product_name: T.StringType() = pa.Field()
    price: T.DecimalType(20, 5) = pa.Field()
    description: T.ArrayType(T.StringType()) = pa.Field()
    meta: T.MapType(T.StringType(), T.StringType()) = pa.Field()

    class Config:
        unique = ["id", "product_name", "price"]

It's implemented here: image

filipeo2-mck commented 4 months ago

Related: #1285