crflynn / pbspark

protobuf pyspark conversion
MIT License
21 stars 5 forks source link

RecursionError when inferring recursive protobuf definitions. #9

Closed abhishekrb19 closed 2 years ago

abhishekrb19 commented 2 years ago

Hello,

Thanks for developing the pbspark library. This seems quite useful for converting protobuf on the wire to dataframes. I have some recursive proto definitions of the form (a simplified example):

message RecMessage {
    string         id        = 1;
    RecMessage     rec_msg   = 2;
}

A minimal code example that shows the issue:

from example.example_pb2 import RecMessage
r = RecMessage(id="recmsg1")
data = [{"value": r.SerializeToString()}]
df = spark.createDataFrame(data)
mc = MessageConverter()
mc_pb = mc.from_protobuf(df.value, RecMessage)
df_decoded = df.select(mc_pb).alias("value")
df_flattened = df_decoded.select("value.*")
df_flattened.show()

The above code barfs with a RecursionError at from_protobuf while trying to infer the Spark schema from the recursive protobuf schema:

----> 7 mc_pb = mc.from_protobuf(df.value, RecMessage)
/usr/local/lib/python3.7/dist-packages/pbspark/_proto.py in get_spark_schema(self, descriptor, options)
    202                     )
    203                 else:
--> 204                     spark_type = self.get_spark_schema(field.message_type)
    205             else:
    206                 spark_type = _CPPTYPE_TO_SPARK_TYPE_MAP[field.cpp_type]()

... last 1 frames repeated, from the frame below ...

/usr/local/lib/python3.7/dist-packages/pbspark/_proto.py in get_spark_schema(self, descriptor, options)
    202                     )
    203                 else:
--> 204                     spark_type = self.get_spark_schema(field.message_type)
    205             else:
    206                 spark_type = _CPPTYPE_TO_SPARK_TYPE_MAP[field.cpp_type]()

RecursionError: maximum recursion depth exceeded while calling a Python object

Wondering how to get around this @crflynn? Thanks!

crflynn commented 2 years ago

My initial thought is that self-referencing definitions just won't work here. If the protobuf definition allows for arbitrary depth, but spark schemas don't, I don't see how we could reasonably infer a schema by the definition alone.

abhishekrb19 commented 2 years ago

@crflynn, yes, that was my initial thought as well. Would it be reasonable for pbspark to terminate the recursive schema inference at a configurable depth (and default to a reasonable depth, say 2 or 3)? Alternatively, provide an option to skip the recursive protobuf bits altogether (this may seem like a hack, but would work for my use case at least because the protobuf data is guaranteed to not be recursive)?

crflynn commented 2 years ago

It's been a while but I think a custom serializer and deserializer for that message type would do the trick. You'd have to specify a parsing function and the return type schema for that message in particular (basically your desired depth). There is an example in the readme.

You can also refer to the timestamp functions that handle datetime serde here: https://github.com/crflynn/pbspark/blob/f2da85dda30d585d00869ab68fae5e8c460981dd/pbspark/_timestamp.py#L20

crflynn commented 2 years ago

I took another look at this problem and added an example+test. You can take a look at it here: https://github.com/crflynn/pbspark/blob/fd75e9d46706bc9b48e2bc0e058ad0e32d95604f/tests/fixtures.py#L14 https://github.com/crflynn/pbspark/blob/fd75e9d46706bc9b48e2bc0e058ad0e32d95604f/tests/test_proto.py#L221

crflynn commented 2 years ago

Were you able to resolve this?

I am closing and labeling this wontfix for now.