confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
44 stars 857 forks source link

How to consume a topic with multiple message types? #1183

Open RagingKore opened 4 years ago

RagingKore commented 4 years ago

Description

Currently the consumer/producer API 's are typed as IConsumer<TKey, TValue> and IProducer<TKey, TValue>.

Is there a way to - out of the box - to consume a topic with multiple message types? (The same goes for sending/publishing messages to a topic.)

I'm very surprised for not finding it, since this scenario most likely covers 99% of the real world implementations and correctly if I may say so.

And just in case there isn't, here is my solution and suggestions to tackle the issue.

I would gladly contribute with a PR @mhowlett

Thank you.

How to reproduce

Just use try to use the provided Consumer and Producer components.

How I fixed this without fixing the consumer code itself (trivial repro)


 // always get bytes[]
ConsumeResult<string, byte[]> consumeResult = Consumer.Consume(stoppingToken);

// header must be added while sending to topic in order to consume
if (!consumeResult.Headers.TryGetLastBytes("message-type", out var bytes)) 
    throw new Exception("Failed to extract message type from headers.");

// trivial encoding of header
var messageType = System.Text.Encoding.UTF8.GetString(bytes); 

// deserialize message via delegate by finding the mapped clr type (using a type mapper dictionary)
// any protocol can be used as long as it complies with the contract of the producer
// protobuf, messagepack, avro, utf8Json...
var message = Deserialize(consumeResult.Value, messageType);

// execute delegate to process message that will find all handlers that match the message clr type
await OnMessageReceived(message);

Suggested high level changes to the client api's

Basically separate the consumer/producer from the serialisation concerns by literally implementing a production ready version of the above code.

This would also allow the separation of concerns, cleaner code and better ability to adapt to change.

Example:

public interface IConsumer<TKey> {
    ValueTask<ConsumeResult<TKey>> Consume(CancellationToken cancellationToken);

    // maybe even a simpler api to avoid the usual while loop and error handling and others
    IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
}

Checklist

Please provide the following information:

RagingKore commented 4 years ago

Bump

mhowlett commented 4 years ago

yes, at this stage you'll need to implement this yourself on top of byte[] as you're doing.

when designing the 1.0 api, we considered various options here, but ended up just following java and having strongly typed producer and consumer.

In the future I anticipate both a Producer and Consumer without type parameters, however implementing this is not highest priority ATM.

We'll likely be slow to accept a PR for this, since the difficult/time consuming thing is having strong conviction on API choice (not the work to implement).

But thanks for the +1 (I agree).

filipeesch commented 4 years ago

You can use Kafka Flow on top of confluent kaka net: https://github.com/Farfetch/kafka-flow

ashish-g-lahane commented 3 years ago

I just tested https://github.com/Farfetch/kafka-flow I could send and consume 2 message types over the same topic without a problem in my small POC. Was looking for a similar solution, kafka-flow fits bill perfectly.

danmalcolm commented 2 years ago

I've described an approach I've used (custom serializer / deserializer) to support multiple message types on the same topic - see comments in #1425 and #1438.