Open michaelandrepearce opened 1 year ago
Can you elaborate on how this would work? Would you expect the output column to be StringType() with json string?
I tried implementing this and it's doesn't quite work because of how protobuf is implemented in the schema registry. The confluent package doesn't support this either, presumably for the same reason: https://github.com/confluentinc/confluent-kafka-python/issues/1174
What I found was that when registering a schema, confluent serializes the FileDescriptor from the message type, and registers it. I assume schema registry then does some FileDescriptor validation. But when you fetch the schema from the registry it returns a .proto file render of the passed FileDescriptor, rather than the serialized FileDescriptor itself.
The python protobuf lib doesn't have any tooling or interface for parsing .proto files directly; it expects FileDescriptor instances throughout. The parsing of .proto files happens in the protoc tool which is in C++. It will parse the .proto files and then pass FileDescriptors into the codegen plugins.
Here is the _schema_to_str
function which encodes the serialized FileDescriptor (file_descriptor.serialized_pb) to submit to schema registry: https://github.com/confluentinc/confluent-kafka-python/blob/842e2df13b3eebc5ae5562f050008c1932c8332d/src/confluent_kafka/schema_registry/protobuf.py#L117
In order to have a dynamic ProtobufDeserializer and dynamically create message types, we would need to either be able to fetch the serialized_pb for a FileDescriptor from the registry, or parse the .proto file into a FileDescriptor.
As far as I can tell the protoc entrypoint is here: https://github.com/protocolbuffers/protobuf/blob/cc83427ee3cf14de4c0308e46ff54381aa6f647a/src/google/protobuf/compiler/command_line_interface.cc#L1187 and the parsing happens in this file: https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/compiler/parser.cc so we would have to have some way to access this functionality or port it to python.
Any plans to support Confluent SR and using dynamic message rather than having concrete message types akin to the avro SR support with Generic message