Closed leosunmo closed 2 years ago
@leosunmo I have not tried protobuf, but as per confluent documentation you will have to use wire format. https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format
On the consumer side you have to ignore first 5 bytes of the message and then deserialize it.
Yeah I've got that far. The approach I'll be taking is to compile all the schemas I care about, and then use the Schema Registry as supplementary information I think. At first I was trying to use purely Schema Registry to handle the Protobuf at runtime. That seems like a very bad idea :P
Hi @leosunmo Did you get a chance to get this working ?
I've ended up using https://developers.google.com/protocol-buffers/docs/proto3#any instead to communicate what type the message is. I rely on having compiled the types already, so no "runtime" dynamic fetching of protobuf files.
@leosunmo will you be able to share a working example with protobufs. I am stuck with avro error (https://github.com/riferrei/srclient/issues/17) where I am unable to get producer and consumer with protobuf registry working.
@dineshgowda24 It's been a while since I looked at this, but I utilise Google's Any format, https://developers.google.com/protocol-buffers/docs/reference/csharp/class/google/protobuf/well-known-types/any.
On the producer side I do something like this:
func makeNotifEventMessage() []byte {
event := &metrics.Event{ # My Protobuf message
Desc: "Email got sent successfully",
SourceSystem: "test-producer",
Ts: timestamppb.Now(),
Type: metrics.Event_EMAIL_SUCCESS,
},
}
metricAny, _ := anypb.New(event) # This wraps the "event" protobuf object in an "Any" protobuf type.
out, _ := proto.Marshal(metricAny)
return out
}
And on the consumer side it ends up looking something like this:
# Handle getting Kafka messages
# Unmarshal in to an "Any" type, which is wrapping the "real" message.
wrapper := &anypb.Any{}
if err := proto.Unmarshal(kafkaMessage.Value, wrapper); err != nil {
c.log.Error("Failed to parse event", zap.Error(err))
continue
}
// This is probably where you could involve protoregistry in some fancy way
// and anypb.UnmarshalNew(), but for now we can keep it simple.
var msgBytes []byte
// Iterate over the possible message types and process them.
switch wrapper.MessageName() { # The "Any" type has a MessageName field which allows you to see what kind of protobuf message the wrapped object is.
case "metrics.Event":
me := &metrics.Event{}
anypb.UnmarshalTo(wrapper, me, proto.UnmarshalOptions{})
msgBytes = processMetricsEvent(me)
default:
c.log.Info(fmt.Sprintf("Unknown message: %s, skipping.", wrapper.MessageName()))
c.commitMessages(ctx, []kafka.Message{})
continue
}
I hope this makes some sense.
@leosunmo Thanks, I get this part.
I wanted to know how you are registering the any
protobuf schema with schema registry and using it in producer and consumer.
If you look at the project's readme it talks about an example of registering and using avro
schema.
I wanted to know the similar thing for protobuf
.
I am not registering anything with the Schema Registry in these examples. This is a way to work with multiple schemas without using Schema Registry. If you wanted to use the Schema Registry you probably wouldn't use Any at all here.
Got it, thanks ππ½
@riferrei
Should Proto serialization/deserialization contributions be made to this repository? Something along the lines of what is available with Codec() for Avro.
I welcome any change that adds value to this library. π
What do you have in mind?
ππ½ Basically parity with Avro. The unfortunate side effect is that the proto wire format has a bit more to it and so the serde is a bet more complicated and IMO probably not right to land in the client for the Schema Registry, I only ask about putting it here because it already supports the Avro & JSON serdes.
If you look at the Wire Format document and scroll past the table to the fine print, there is a message-indexes
part of the payload that needs to be calculated. Similarly for deserialization we need to look up the proto message at the message-indexes
location in the schema β which involves fetching the schema and reflecting through it. Lots of stuff that I feel like is outside the scope of the SR client β yet necessary to put proto on the same ground as Avro & JSON in the client. Most of this work is documented and completed in this PR already.
Then we get to publishing and reading the schema itself, which involves multiple schemas when there are references to external types, as documented in their Protobuf Serializer docs. I havenβt had the need for this yet and have yet to look into solving it.
I'm inclined to add these to an examples folder and mention in the README.md to close this issue and #17 , what do you think @riferrei ?
Good idea. ππ»
I'm closing this issue as we have included existing examples in the repo and the rest is on PR at #85
Hey, I am trying to figure out if I can use this client to fetch Protobuf schemas in a Go consumer. Does anyone have any examples of how you could accomplish this? Without running the Proto cli to pre-generate the code I am not sure how this would work at runtime.
Any pointers or example would be appreciated!