kaiko-ai / typedspark

Column-wise type annotations for pyspark DataFrames
Apache License 2.0
65 stars 4 forks source link

Column names with hyphens - Possible in latest? #416

Open jklob2022 opened 4 months ago

jklob2022 commented 4 months ago

If it is possible with the current codebase, are there any examples of how to put a schema together with a field that has a hyphen in it?

Unfortunately due to AWS naming, when using an EventBus you get a column name of something like "detail-type"...

I'm not sure what syntax would be in the current implementation but ultimately would need to be able to do something like the following (knowing that the following won't work because the syntax won't work in python)

# Event Bus Schema
class EventBusData(Schema):
    account: Column[StringType]
    detail: Column[TypedSparkStructType[MainDataStruct]]
    detail-type: Column[StringType]
    id: Column[StringType]
    region: Column[StringType]
    resources: Column[TypedSparkArrayType[StringType]]
    source: Column[StringType]
    time: Column[TimestampType]
    version: Column[StringType]
nanne-aben commented 4 months ago

Hi @jklob2022 ,

Yes, we can do something about that! :)

We actually already tackle a case like this when you load a DataSet in a notebook, through Catalogs() or load_table() for example. Whenever there's a non-allowed character in the column-name, we replace it with an _ in both the schema and the DataFrame (and when that results in collisions, we resolve those collisions).

Your usecase is similar, but with the difference that you want to do this with an IDE-defined schema. I think the following would work for that:

from typing import Type, TypeVar, Tuple
from pyspark.sql import DataFrame
from typedspark import DataSet, register_schema_to_dataset, transform_to_schema
from typedspark._utils.load_table import _replace_illegal_column_names

T = TypeVar("T")

def to_dataset(df: DataFrame, schema: Type[T]) -> Tuple[DataSet[T], Type[T]]:
    """Converts a DataFrame to a DataSet and registers the Schema to the DataSet.
    Also replaces "illegal" characters in the DataFrame's colnames (.e.g "test-result"
    -> "test_result"), so they're compatible with the Schema (after all, Python doesn't allow for
    characters such as dashes in attribute names).
    """
    df = _replace_illegal_column_names(df)
    ds = transform_to_schema(df, schema)
    schema = register_schema_to_dataset(ds, schema)
    return ds, schema

You'd then define your schema as:

class EventBusData(Schema):
    account: Column[StringType]
    detail: Column[TypedSparkStructType[MainDataStruct]]
    detail_type: Column[StringType]
    id: Column[StringType]
    region: Column[StringType]
    resources: Column[TypedSparkArrayType[StringType]]
    source: Column[StringType]
    time: Column[TimestampType]
    version: Column[StringType]

Would that solve your problem? At least short-term?

Long-term, we should add this function to typedspark. That would also resolve the obvious problem that the above function imports from a private module.

I'd have to think about the name though. I don't like the to_dataset(), it's not that descriptive.

Also, I'd like to limit the number of functions in typedspark, I feel like we already have a lot of them. I'd rather have it as a (class)method somewhere, so users don't need to import a ton of different functions. Maybe we can do something like:

Dataset[EventBusData].from_dataframe(...)

I could probably implement that later this week.

What do you think?

jklob2022 commented 4 months ago

I think the hardest part is that if the schemas are already prescribed (on outgoing and incoming) then I need the input and output to be "detail-type" instead of "detail_type".

I suppose I could utilize something like this short term for output but then input I guess I have to use the typedspark function to_dataset that you've suggested beforehand or I could forego checking the source/input schema altogether in favor of just checking the output (although I'd rather check both)

outgoing_df = Dataset[EventBusData](incoming_df.select(...))
outgoing_df = outgoing_df.withColumnRenamed("detail_type", "detail-type")

Is there a way to extend so "detail-type" could be allowed? Perhaps the notation might have to change (Making this up here below... not sure about original intention for name and alias args, I'm also not a python wizard and was trying to follow how new and init were being utilized on Column in the typedspark source):

class EventBusData(Schema):
    account: Column[StringType]
    detail: Column[TypedSparkStructType[MainDataStruct]]
    detail_type = Column(name='detail-type', dtype=StringType)
    id: Column[StringType]
    region: Column[StringType]
    resources: Column[TypedSparkArrayType[StringType]]
    source: Column[StringType]
    time: Column[TimestampType]
    version: Column[StringType]

I assume from a typing standpoint under the covers this would resolve to something like the following?

{
    'account': typedspark._core.column.Column[pyspark.sql.types.StringType],
    'detail': typedspark._core.column.Column[typedspark._core.datatypes.StructType[path.to.MainDataStruct]], 
    'detail-type': typedspark._core.column.Column[pyspark.sql.types.StringType],
    'id': typedspark._core.column.Column[pyspark.sql.types.StringType], 
    'region': typedspark._core.column.Column[pyspark.sql.types.StringType], 
    'resources': typedspark._core.column.Column[typedspark._core.datatypes.ArrayType[pyspark.sql.types.StringType]], 
    'source': typedspark._core.column.Column[pyspark.sql.types.StringType], 
    'time': typedspark._core.column.Column[pyspark.sql.types.TimestampType], 
    'version': typedspark._core.column.Column[pyspark.sql.types.StringType]
}

Thank you for the response -- I wasn't sure how active all the authors were on here and we've found this to be a useful package.

nanne-aben commented 4 months ago

we've found this to be a useful package.

Thanks! Always happy to hear that :)

I think the hardest part is that if the schemas are already prescribed (on outgoing and incoming) then I need the input and output to be "detail-type" instead of "detail_type". Right, that makes sense.

We could do something like this:

from typing import Annotated
from pyspark.sql.types import StringType, TimestampType
from typedspark import (
    Column,
    Schema,
    StructType as TypedSparkStructType,
    ArrayType as TypedSparkArrayType,
)
from typedspark._core.column_meta import ColumnMeta

class EventBusData(Schema):
    account: Column[StringType]
    detail: Column[TypedSparkStructType[MainDataStruct]]
    detail_type: Annotated[
        Column[StringType],
        ColumnMeta(name="detail-type"),  # can we make the `name` parameter more specific?
    ]
    id: Column[StringType]
    region: Column[StringType]
    resources: Column[TypedSparkArrayType[StringType]]
    source: Column[StringType]
    time: Column[TimestampType]
    version: Column[StringType]

You'd then still load the DataSet using:

event_bus_data, _event_bus_data = Dataset[EventBusData].from_dataframe(...)

Which would rename detail-type to detail_type, so we can use it in typedspark.

And when we're ready to pass the DataSet onwards again, we'd rename the column back to detail-type using something like:

event_bus_data.to_dataframe()  # I'm open to a better name for this function

I think we could make this work. What I don't love though:

  1. If you'd ever want to change the name of event-type, you'd have to change event_type as well.
  2. You can come up with scenarios where to_dataframe() would try to rename a column to a name that already exists. And those problems would only show up during runtime.

These are not the biggest problems, but maybe we can come up with a neater solution?

jklob2022 commented 4 months ago

I'm looking at your comments here on what you don't love -- I think 1 is tricky but perhaps if you were going to change the name and it wasn't a straight passthrough (same schema on output as input) then you'd have to declare two schemas anyways. I think this is okay.

For 2 I'm not super worried as I don't think I'd see detail-type and detail_type both on the same input or output set but it could definitely be listed as a known limitation so people would know what to expect.

For transformations (1) I assume you'd have something like (assuming you want to keep columns with hyphens in the names):

class InputSchema(Schema):
    length_box: Annotated[
        Column[DoubleType],
        ColumnMeta(name="length-box"),
    ]
    width_box: Annotated[
        Column[DoubleType],
        ColumnMeta(name="width-box"),
    ]

class OutputSchema(Schema):
    area_box: Annotated[
        Column[DoubleType],
        ColumnMeta(name="area-box"),
    ]

# ...

# Validation of input
input_df = Dataset[InputSchema].from_dataframe(...)

transform_df = (
    input_df.withColumn(
        OutputSchema.area_box.str,
        (InputSchema.length_box * InputSchema.width_box)
    )
)
# Validation of output occurs here pre-"to_dataframe"
transform_df = Dataset[OutputSchema](
    transform_df.select(
        OutputSchema.area_box
    )
)

output_df = transform_df.to_dataframe(transform_df)

For nomenclature:

I'm not sure if "name" is bad or good because to be honest I don't know the original intentions behind the current parameter in place. Perhaps something like "df_name" or "df_field"? I personally have no preference here -- I think as long as it's documented that people will utilize it.

jklob2022 commented 4 months ago

I also feel I should ask the question even though I think the answer is yes-

Is from_dataframe structured streaming safe? Would to_dataframe be stream safe?

nanne-aben commented 4 months ago

Thanks for the feedback! Lemme check it with some of my colleagues tomorrow, I'd like to get some more opinions on this.

I also feel I should ask the question even though I think the answer is yes-

Is from_dataframe structured streaming safe? Would to_dataframe be stream safe?

Could you elaborate on what stream safe means? Does it have to do with spark evaluating lazy? As in: are you asking whether to_dataframe() and from_dataframe() do not trigger an evaluation? In that case: yes, they should not trigger an evaluation.

jklob2022 commented 3 months ago

Yes on not triggering an evaluation. Also not utilizing any methods (like aggregations/distinct/etc that can't be done outside forEachBatch -- if so we would just want to know). Thank you

nanne-aben commented 3 months ago

In that case, yes, it should be completely stream safe.

After giving it some more thought, I do like what you've suggested. Maybe we can use external_name as a param name.

class InputSchema(Schema):
    length_box: Annotated[
        Column[DoubleType],
        ColumnMeta(external_name="length-box"),
    ]
    width_box: Annotated[
        Column[DoubleType],
        ColumnMeta(external_name="width-box"),
    ]

class OutputSchema(Schema):
    area_box: Annotated[
        Column[DoubleType],
        ColumnMeta(external_name="area-box"),
    ]

It will be a bit more work than I initially thought. We'll need to:

  1. Expand ColumnMeta (should be easy)
  2. Implement DataSet[Schema].from_dataframe() (almost there)
  3. Implement DataSet.to_dataframe() (should be easy)
  4. Ideally, also update the notebook functions (e.g. Catalogs() and load_table()) such that column names with illegal characters are loaded with ColumnMeta(external_name=...)
  5. Update all the documentation accordingly

If you're willing to contribute to this, we could split it up between us. I can also do it alone, but it will take a bit longer.

jklob2022 commented 3 months ago

I'm not sure how much help I will be -- I will see if I can carve some time out next week to setup the dev environment as per https://typedspark.readthedocs.io/en/latest/contributing.html -- Is this still up to date for contributing?

nanne-aben commented 3 months ago

I will see if I can carve some time out next week

That's great!!

the dev environment as per https://typedspark.readthedocs.io/en/latest/contributing.html

Yes, that one is still up-to-date! Although you might want to set it up to Python 3.12 (instead of 3.11 as currently specified in the docs). Not a huge difference though (and both will be tested in the ci).

I'm not sure how much help I will be

I've already done most of nr 3 in the PR that's associated to this issue. The only thing is that we'd need to swap out is this line to something that will take ColumnMeta's external_name. Nr 1 and nr 2 will logically follow from that. I think it will be relatively easy.

Nr 4 and nr 5 are kinda tricky, I can pick those up :)

nanne-aben commented 3 months ago

Hi @jklob2022 !

It's been very busy at work, so I haven't made much progress here. Just wanted to let you know this is still on my radar, it will come when I find the time :)

jklob2022 commented 3 months ago

Same here. Does this project have a slack or anywhere?

nanne-aben commented 3 months ago

Hmm, not really. I've previously enabled the discussions tab here on Github, but it isn't really active. Maybe a Slack is a good idea though.

I've set a Slack space up here.

nanne-aben commented 3 months ago

I've updated the PR to implement 1, 2, and 3. Will cover 4 and 5 sometime next week!

nanne-aben commented 3 months ago

... I hope we didn't do any double work btw, just realised that we did kinda say that you might pick up 1 and 2. If we did do double work, my apologies!

Also, if this is something you could use right away, we could potentially already release it and do step 4 & 5 separately. Lemme know!

ovidiu-eremia commented 2 months ago

Hi @nanne-aben, Could you please put another link for slack as his one has expired:

I've set a Slack space up here.

Thank you!

nanne-aben commented 2 months ago

@ovidiu-eremia Of course, I've re-enabled the link!

@jklob2022 I'm still working on this ticket. Give me a heads-up if you'd like to already have access to the functionality that's in the PR, we could merge it. The remaining work is to get other parts of the package consistent and to create proper documentation.