kaiko-ai / typedspark

Column-wise type annotations for pyspark DataFrames
Apache License 2.0
65 stars 4 forks source link

No options to define nullability of fields #253

Closed ram-seek closed 9 months ago

ram-seek commented 10 months ago

Currently there is no way to set the nullability of a field and when calling the get_structtype function to get the schema the nullability is hardcoded as true.

https://github.com/kaiko-ai/typedspark/blob/a0c03e8cce8cac630d02516d6347209666f335c8/typedspark/_schema/structfield.py#L42

nanne-aben commented 10 months ago

Hi @ram-seek !

We've previously played around with making nullable a parameter in the schemas, but it turned out that in pyspark nullabillity is pretty dynamic. For example, consider the following example, where column b switches from nullable to non-nullable and back again.

afbeelding

Given how counter-intuitive this works, it really didn't add much to have nullability as an option in the schema. I think nullability is mostly used under the hood by Spark for optimizations.

Does this answer your question? Or do you have a different scenario in mind in which you'd like to set the nullability?

ram-seek commented 10 months ago

@nanne-aben thanks for that, what I was trying to do was use typedspark for schema definitions and also for function signatures.

Schemas I work with are complex and highly nested and using typedspark to define the schema makes it more manageable. I could also reuse some of the classes in other nested structures like so.

image

another place I was looking to use typed spark is to define functions and validate the types of the argument as well the return types.

image

Please let me know if that makes sense or if you need more information

nanne-aben commented 10 months ago

Yup, that's exactly what typedspark is for!

Although for the last code block, you should use concat() rather than addition. In Spark, you can't add StringType columns.

from pyspark.sql.functions import concat, lit

df.withColumn(
    Address.street.str,
    concat(
        Address.street,
        lit(" "),
        Address.number,
    ),
)

You could also consider using transform_to_schema() (doc):

from typedspark import transform_to_schema

transform_to_schema(
    df,
    Address,
    {
        Address.street: concat(
            Address.street,
            lit(" "),
            Address.number,
        )
    },
)

Lemme know if you have any other questions!

ram-seek commented 10 months ago

sorry If I was not being clear, the sample code was just to illustrate the use of datasets in the function signatures and the implementation of the function. The point I was trying to make was , for schema definitions and function signatures it is essential to have the nullability of the column defined in the schema.

nanne-aben commented 10 months ago

Do you mean that you want to be able to define which columns are nullable, so that you can be certain street and number are not null and hence you're not concatenating something with a null column?

If so, I see your point, but that's not really how Spark works. Suppose we do define nullability in the schema somehow:

class Address(Schema):
    street: Annotated[Column[StringType], ColumnMeta(nullable=False)]
    number: Annotated[Column[LongType], ColumnMeta(nullable=False)]

df = create_partially_filled_dataset(spark, Address, {Address.street: ["high street"], Address.number: [1]})

We might think of four ways to enforce that.

First option: make sure that df.filter(Address.street.isNull()).count() == 0 (and idem dito for Address.number). It's effective. But it will trigger a compute of the df (remember that Spark is evaluated lazily), which I don't want to do for every cast.

Second option: make sure that df.schema[0].nullable == False (and idem dito for df.schema[1]). However, this nullable attribute can be True even when there are no Null values at all. This happens a lot, for example when you load a hive table (see the example in my first post in this issue).

Third option: overwrite the value such that df.schema[0].nullable = False. This doesn't enforce anything though, you can set it to False on a column full of Null values.

afbeelding

Fourth option: create a new df after changing the schema:

df.schema[1].nullable = False
new_df = spark.createDataFrame(df.rdd, df.schema)

Unfortunately, it is super costly to do this.

Hence, none of the options are great. In my opinion, typedspark is not the best way to enforce nullability, because nullability is something you should check all the way at the end, when you're computing your entire df, not at every intermediate step.

I know DBT offers exactly this: a way to test for nulls in your data after you've computed the entire table. DBT also works well together with typedspark: DBT for the pipeline, typedspark for development in your IDE. We have some code that automatically transforms schemas between DBT and typedspark.

If DBT is too heavy of a solution, then the best would be to add some assertions that check df.filter(Address.street.isNull()).count() == 0.

Lemme know if that answers your question!