dfds / dafda

.NET Kafka client library
7 stars 18 forks source link

Support schema registry #49

Closed martimors closed 1 year ago

martimors commented 3 years ago

Confluent Schema Registry is an open source service that stores schemas for kafka topics. It adds a layer of validation to producers so that topics don't get corrupted data in them, and eliminates the need for consumers to define types for the event and rather read the type from the (protobuf) schema. For public async APIs I think schema registry would help loosen coupling between components because validation responsibility is shifted to a central component instead of being the responsibility of the upstream or downstream service itself (or both).

Confluent Kafka Dotnet (dependency of Dafda) already supports Schema Registry, see this example of using it with protobuf from their codebase.

chamook commented 3 years ago

Had a quick poke around to try to see what would be required in Dafda for this - I don't think necessarily that anything needs to be changed massively as the parts affected can be overridden externally, but it could be convenient to have some built-ins and there are definitely places where we could improve things (e.g. making more things async)

For consuming messages it would mainly require an implementation of IIncomingMessageFactory that could read a protobuf (or other format) message and extract the necessary metadata along with the payload:

type ProtobufMessageFactory() =
    interface IIncomingMessageFactory with
        member x.Create(rawMessage) =
            let messageBytes = Encoding.UTF8.GetBytes(rawMessage) |> ReadOnlyMemory
            let msg = ProtobufDeserializer<SomeMessageType>().DeserializeAsync(bytes).Result

            TransportLevelMessage
                (Metadata( (* extract metadata about the message into a dictionary and put that here *)),
                 fun t -> msg) // if you can get metadata without deserializing, the actual deserialization should be deferred to this function

Producing messages is something I've played with less, and it seems to currently expect values to be sent out as strings which I'm not sure is the right approach with formats like protobuf. But AFAICT we mainly need to implement a version of IPayloadSerializer:

type ProtobufPayloadSerializer(protobufSerializer: ProtobufSerializer) =
    interface IPayloadSerializer with
        member x.Serialize(descriptor) =
            Encoding.UTF8.GetString
                (protobufSerializer.SerializeAsync(descriptor.MessageData :> Type, (* idk what the serializer context is tbh *)).Result)

Does that sound in line with your expectations or am I misunderstanding something here? 🙂

martimors commented 3 years ago

I'm just getting started with Dafda for now, and if the rest of the org can live without Schema Registry for now then I can as well. I'll be willing to contribute down the line when I have familiarized myself a bit more with the library.

martimors commented 2 years ago

Hi again,

We are facing issues with scemaless events that I'd like to highlight here as reasons why we should slowly move towards schemas in Kafka, and have Dafda support it as a first step. I'm willing to contribute to making it happen with some guidance, as I am not too familiar with the library just yet.

  1. The Type enum. The Dafda library uses a string value to give an event Type. I assume this is more like an event name than a type, in case the consumer does not understand that from the context (topic name, other fields). For example, a topic could have the types Created, Updated and Deleted, and the payload has the same fields for each. However, I have seen this Type enum (mis?)used to publish several events with completely different schemas in the same topic. In my view this is an antipattern, because as a consumer I should be able to expect that the incoming events will adhere to a certain schema. As far as I understand from the Kafka community, the way Kafka is used in most cases is one event schema per topic, and not using it in this way causes all kinds of issues downstream.
  2. The tooling for kafka (ksqldb, kafka connect, kafka streams etc.) are all designed assuming one schema per topic. If we implement schema registry support, we could avoid having to build all kinds of custom switch...case in custom applications to get around the Type enum, and rather rely more on out-of-the-box tooling to do the heavy lifting. As an example, we could use the kafka connect JDBC sink connector to sink event data with schemas into a data warehouse. This is not easily possible without schemas on the events.
  3. The last but a bit obvious advantage is of course not having to ask around for the C# types for the events we consume, thus further decoupling our services and making teams more autonomous.

Interested in hearing your thoughts!

martinosk commented 2 years ago

@dingobar it's not that black or white I think. This article has some good points though: https://www.confluent.io/de-de/blog/put-several-event-types-kafka-topic/ I think it's important to keep the purpose of Dafda in mind and that is exchange of information between bounded contexts to enable an orchestrated microservice architecture.

martimors commented 2 years ago

Thanks for the excellent article, it helped me a lot and I'm happy to see that this pattern is indeed something the Kafka ecosystem caters for to some extent. There is also another approach I found, here: https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/ This approach removes the need for the consumer (Dafda) to add another layer of logic to determine which schema to use.

avnes commented 1 year ago

I am closing this issue since Martin has left DFDS.