confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.62k stars 657 forks source link

Dynamically consuming protobuf messages with dynamicpb #953

Open james-johnston-thumbtack opened 1 year ago

james-johnston-thumbtack commented 1 year ago

Description

As best I can tell, it is not possible to dynamically read protobuf messages with the protobuf deserializer if the schema is not known in advance by way of generating code from a .proto file with protoc as normally done with protobuf.

For example, suppose I want to create a generic debugging tool that prints the decoded message payload of a protobuf topic.

Or another example - currently it looks like the Kafka Connect sources provide no guarantee about the field numbering within a message: it just assigns field numbers in an auto-incrementing fashion when converting to a protobuf message descriptor from a Connect schema. Thus, for example, if a field in the original source (e.g. PostgreSQL database) is deleted, the fields would be renumbered. Thus, given the unpredictability in field numbers, it would appear the only safe way to read protobuf messages serialized by Kafka Connect sources would be dynamically.

In general, in order to dynamically read messages, it looks like one would need to:

  1. Parse the IDL from Schema Registry using protocompile or protoparse (already used by confluent-kafka-go, but has now been deprecated by protocompile) into a message descriptor.
  2. Pass the message descriptor to dynamicpb.NewMessage.

In theory, one would think that one might be able to use the MessageFactory mechanism and call dynamicpb.New from there. However, the function signature does not include the parsed message descriptor or even the original IDL text; thus it is not possible to call dynamicpb.New. In fact, it looks like the parsed message descriptor is not used or available for any purpose other than finding the fully-qualified name; thus it is clearly unavailable for any other purpose like reading messages dynamically.

This issue is therefore a feature request to support dynamic reading of protobuf messages, which does not currently seem possible:

Without this, I don't think it is possible to safely consume messages from Kafka Connect sources given their unpredictability of protobuf field numbers. And of course, making other types of generic tooling is impossible.

PrasanthV454 commented 1 year ago

Hi @james-johnston-thumbtack, we don't currently have a plan for this. But we will keep track of this here.

ideasculptor commented 1 year ago

Seems like simply updating the interface of MessageFactory to receive the MessageDescriptor that was already parsed out of the schema fetched from the client would solve this problem without requiring a specific feature. The user's implementation of MessageFactory could choose to instantiate a dynamicpb via the MessageDescriptor so no requirement for specific support of dynamicpb.

Unfortunately, there's no analog to the message descriptor parsed from the schema for the other 2 supported schema types, but there could be some kind of Opaque field added to schemaregistry.SchemaInfo and it would be perfectly reasonable for the MessageFactory interface to include the schemaregistry.SchemaInfo as a parameter, or use a protobuf-specific extension of schemaregistry.SchemaInfo which adds a field for MessageDescriptor and let the implementation cast to the proto-specific struct if the SchemaType field in SchemaInfo specifies a proto schema.

Doing this would also allow older versions of a given schema to be instantiated rather than always using the parsed message descriptor just to resolve a message name and then instantiating the latest version via the protoregistry. If an older field was dropped in more recent versions of a schema, there would be no way to deserialize the older version in order to access the field that is no longer included in the more recent descriptor - so no way to recover the data and convert it to something compatible with the latest protobuf definition. It seems unnecessarily limiting not to implement interfaces that would support doing things like this.