confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
44 stars 857 forks source link

Question: Multiple events in same topic with Protobuf #1438

Open RagingRudolf opened 3 years ago

RagingRudolf commented 3 years ago

Description

I'm working on a microservice based solution where I so far have two microservices.

The purpose of the configurator service is to manage different types of entities in my solution, adding fields, deleting fields etc. The purpose of Entity Service is to manage entities and their data.

I have created one topic which I have decided to keep events for entities configuration changes - because order is important. Etc. I create a new field on an entity and delete it right after it is important that it will be kept in same order (other wise leaving domain in some weird state).

I have two producers - one listening for AddFieldEvent and the other RemoveFieldEvent. So as I though (wrongly) I have two consumers - one for each event (using same group id). The two consumers are running inside each of their own HostedService in my ASP.NET Core application. What I saw was that the AddFieldEvent consumer also received the RemoveFieldEvent and the other way around. From what I can read it seems that a consumer on a topic will always receive all events on a given topic.

So.. It seems to me that instead of having two consumers I should just use one with a DynamicMessage and then convert each event into their specific eventtype and pass it on to a handler - but I feel that I lose some of the nice feeling about the API.

So what is the recommendation? I cannot find anyone else using Protobuf with multiple events.

Other question: When using Protobuf Serializer with Schema Registry - I provide CachedSchemaRegistryClient for the serializer but it is not available for deserializer - is that not an error since deserializer then cannot compare schema in received event?

Best regards Martin

Checklist

Please provide the following information:

mhowlett commented 3 years ago

the protobuf deserializers don't support dynamic interpretation of data yet (more than on schema in a topic). since all of the information required to deserialize a protobuf message is contained in the serialized data, we don't need schema registry integration, given the reader schema is known from the generated stub.

i hope to build a more sophisticated wrapper on top of the basic producer that allows dynamic dispatch of messages based on schema, but we don't have this yet.

for now, the best you can do i think is use optionals in a single protobuf message.

RagingRudolf commented 3 years ago

I'm not sure why I did not find it obvious that the way the protobuf message is constructed also validates it schema. Thank you for clearing that up!

So far I have not completely decided whether I want to use Protobuf or any other serializer. I am not that much into the different types and what their advantages or disadvantages are. I will still need to look more into it.

Right now I'm trying out using Avro which have the DynamicRecord - and then in the consumer I'm implementing a chain of responsibility pattern to decide which handler shouild take care of the event (also converting it to the specific event).

I will have to do some testing to figure whether this is direction or if I should wait for Protobuf mature a bit more.

danmalcolm commented 2 years ago

I'm surprised that there aren't more requests like this. It seems like a pretty common use case to log ordered meaningful state change events (like SomethingRequested, SomethingStarted, SomethingFinished etc) to a single topic.

It's something that I've been looking into for a while. I have prototyped some custom serializers / deserializers (Avro and Protobuf) to support multiple value types on a single Kafka topic.

This is available at: https://github.com/danmalcolm/confluent-kafka-dotnet/tree/multiple-message-types-on-topic (see the AvroMultipleTypesPerTopic and ProtobufMultipleTypesPerTopic example projects).

This works as follows:

  1. At start up, we configure our multiple type serializers / deserializers with a list of the message value types we want to support
  2. The multiple type serializer contains a map of standard schema registry serializers (e.g. Confluent.SchemaRegistry.Serdes.AvroSerializer)
  3. During serialization, we simply delegate to the serializer for the value being stored
  4. During deserialization, we take advantage of the fact that the schema registry serializers include a schema identifier within the data written for each key and value. We look up and delegate to a deserializer that supports the destination value type.

I've no idea whether an approach like this would ever make it back into Confluent.Kafka, where the API is very much focused on specific key and value types.

This is a POC only - no unit tests etc. Hopefully there's enough here for teams to get started if they do want to do this right now in their applications.