crflynn / pbspark

protobuf pyspark conversion
MIT License
21 stars 5 forks source link

Usage question - whole data frame as input to proto encoding #14

Closed BrendanJM closed 2 years ago

BrendanJM commented 2 years ago

Thanks for sharing this repo - I see a lot of potential here! I'm running into some issues with DataFrame aliasing, and wondering what the recommended workaround is since this case does not seem to work with the readme instructions. Let's say I already load a dataframe, with intent to pass all columns as fields to be encoded:

df = spark.sql("select * from my_db.my_table").alias("some_field")
df.printSchema()  # note you can't see the alias in the schema below, but we can confirm it exists in the examples further down

>>> 
root
 |-- subfield_1: double (nullable = true)
 |-- subfield_2: double (nullable = true)
 | ... etc

It seems as though this doesn't work as cleanly as your readme example when I try to encode:

# Try dot notation (as in readme example): 
df.select(mc.to_protobuf(df.some_field, SimpleMessage))

>>> AttributeError: 'DataFrame' object has no attribute 'some_field'

# Try passing column name in column notation (note you can confirm the DF-alias in the available columns here): 
df.select(mc.to_protobuf(f.col('some_field'), SimpleMessage))

>>> org.apache.spark.sql.AnalysisException: cannot resolve '`some_field `' given input columns: [some_field.subfield_1, some_field.subfield_2, ...]

# Try using star notation:
df.select(mc.to_protobuf(f.col('some_field.*'), SimpleMessage))

>>> org.apache.spark.sql.AnalysisException: Invalid usage of '*' in expression 'pythonudf';

# Try selecting (with and without star):
df.select(mc.to_protobuf(df.select('some_field'), SimpleMessage))
df.select(mc.to_protobuf(df.select('some_field.*'), SimpleMessage))

>>> TypeError: Invalid argument, not a string or column: DataFrame[subfield_1: double...]

# Try bracket selection (with and without star, note the subfields don't show alias in this approach):
df.select(mc.to_protobuf(df['some_field'], SimpleMessage))
df.select(mc.to_protobuf(df['some_field.*'], SimpleMessage))

>>> org.apache.spark.sql.AnalysisException: Cannot resolve column name "some_field" among (subfield_1, subfield_2, ...)

The real-world use case here is that I would have a larger dataframe with multiple aggregated aliases through some joining, e.g.

df.alias('some_field').join(df2.alias('some_other_field'))

And then I would encode into two proto columns (e.g. 'some_field' and 'some_other_field'). The schemas are significantly complex, so it's not really a possibility for me to duplicate out the entire schema and assign each column individually.

Any recommendations on a way forward?

BrendanJM commented 2 years ago

FWIW this is on spark 2.4

crflynn commented 2 years ago

Thanks for the report. I think I have a workaround that might work but in figuring it out I uncovered a few other bugs that I want to fix.

First of all, in the example and in the round-trip test we just convert from message to struct and then struct to message. In your case you need to first "unflatten" the message by packing it into a struct and then do the conversion, which might look something like this:

data = [Row(value=row).asDict(recursive=True) for row in df.collect()]
df_unflattened = spark.createDataFrame(
    data=data,
    schema=StructType(
        [
            StructField(
                name="value",
                dataType=mc.get_spark_schema(SimpleMessage),
                nullable=True,
            )
        ]
    ),
)
df_proto = df_unflattened.select(
    mc.to_protobuf(df_unflattened.value, SimpleMessage).alias("value")
)

A couple of things here: This might not work because of the other bugs I found. Also, I'm not sure how we would convert each row into a dictionary without using Row.asDict and collect. Calling collect is expensive, but the only part of the spark API that I could find which packs to a dictionary with column names as keys is on the Row API.

I thought about including a helper function to do this, but seeing as calling collect is not ideal I'd like to consider another approach before I do that.

EDIT: please try on 0.4.0

BrendanJM commented 2 years ago

Thanks for taking the time to respond! If a dict of the fields is what you need, it seems like you could access the schema using df.schema and convert it to a dictionary, which would avoid a call to collect.

crflynn commented 2 years ago

Well you would still need the data itself to be packed into a dict whose structure matches that of the protobuf message, and for that I'm not sure how to do it without collecting.

BrendanJM commented 2 years ago

I've been trying to give this a go, but the collect operation is a bit problematic. At the moment with the size and shape of my data I'm not sure I'll be able to successfully collect at any reasonable scale. I'm wondering if there is still some possibility to keep this distributed.

It may be possible to do this in a UDF, as a UDF that access a struct col (or number of struct cols) would be accessed inside the UDF as a dict of the data, and that would maintain the distributed nature of this without needing to collect.

crflynn commented 2 years ago

There is a pyspark function called struct that might work. It looks like this

from pyspark.sql.functions import struct

df_unflattened = df.withColumn("value", struct([df[x] for x in df.columns])).select("value")
df_proto = df_unflattened.select(
    mc.to_protobuf(df_unflattened.value, SimpleMessage).alias("value")
)

This seems like it might work without needing to collect, assuming the dataframe df has a schema which when unflattened matches the one that would be derived from the message.

crflynn commented 2 years ago

I've updated the roundtrip test here to include flattening and unflattening. I'm using the struct func and it seems to work as we'd expect without having to call collect.

https://github.com/crflynn/pbspark/blob/d345eca26b2a3e95412497e23581e08bd6e87581/tests/test_proto.py#L178

crflynn commented 2 years ago

Hey @BrendanJM. I recently added some helper functions which you might find useful here as well.

Assuming in your original table the schema of my_db.my_table matches that of your message, you ought to be able to do something like this, which would result in a dataframe with a single column, value which contains encoded protobuf

from pbspark import df_to_protobuf

df = spark.sql("select * from my_db.my_table")
df_encoded = df_to_protobuf(df, SimpleMessage, expanded=True)

I'm going to close this as I think there are a few possible solutions here. Feel free to reopen if you are still having an issue.