confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
2.78k stars 847 forks source link

Provide non-generic ISerializer, IAsyncSerializer, IDeserializer, IAsyncDeserializer #2186

Open horato opened 5 months ago

horato commented 5 months ago

Description

I am using one consumer to consume multiple topics with different avro schemas. I created a deserializer, but for the lack of non-generic IDeserializer, I had to resort to all kind of weird hacks.

Non-generic I(Async)Deserializer would reduce this code

public class TopicSpecificAvroDeserializer : IDeserializer<ISpecificRecord>
{
    private readonly ConcurrentDictionary<string, object> _deserializers = new ConcurrentDictionary<string, object>();
    private readonly ISchemaRegistryClient _schemaRegistryClient;

    private static readonly IDictionary<string, Type> Configuration = new Dictionary<string, Type>
    {
        { "topic.A", typeof(TopicA) },
        { "topic.B", typeof(TopicB) },
    };

    private delegate object DeserializeDelegate(ReadOnlySpan<byte> data, bool isNull, SerializationContext context);

    public TopicSpecificAvroDeserializer(ISchemaRegistryClient schemaRegistryClient)
    {
        _schemaRegistryClient = schemaRegistryClient ?? throw new ArgumentNullException(nameof(schemaRegistryClient));
    }

    public ISpecificRecord Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        var deserializer = _deserializers.GetOrAdd(context.Topic, CreateDeserializer);
        return CallDeserialize(deserializer, data, isNull, context);
    }

    private ISpecificRecord CallDeserialize(object deserializer, ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        var instance = Expression.Constant(deserializer);
        var dataParameter = Expression.Parameter(typeof(ReadOnlySpan<byte>), "data");
        var isNullParameter = Expression.Parameter(typeof(bool), "isNull");
        var contextParameter = Expression.Parameter(typeof(SerializationContext), "context");

        var method = deserializer.GetType().GetMethod(nameof(IDeserializer<object>.Deserialize));
        var call = Expression.Call(instance, method, dataParameter, isNullParameter, contextParameter);
        var expression = Expression.Lambda<DeserializeDelegate>(call, dataParameter, isNullParameter, contextParameter);
        var func = expression.Compile();
        return (ISpecificRecord)func(data, isNull, context);
    }

    private object CreateDeserializer(string topic)
    {
        if (!Configuration.TryGetValue(topic, out var type))
            throw new InvalidOperationException($"Missing message type for topic {topic}");

        var deserializerType = typeof(AvroDeserializer<>).MakeGenericType(type);
        var deserializerInstance = Activator.CreateInstance(deserializerType, _schemaRegistryClient, null);
        if (deserializerInstance == null)
            throw new InvalidOperationException($"Failed to create instance of {typeof(AvroDeserializer<>)} with generic argument {type}");

        var syncDeserializerType = typeof(SyncOverAsyncDeserializer<>).MakeGenericType(type);
        var syncDeserializerInstance = Activator.CreateInstance(syncDeserializerType, deserializerInstance);
        if (syncDeserializerInstance == null)
            throw new InvalidOperationException($"Failed to create instance of {typeof(SyncOverAsyncDeserializer<>)} with generic argument {type}");

        return syncDeserializerInstance;
    }

to this code

public class TopicSpecificAvroDeserializer : IDeserializer<ISpecificRecord>
{
    private readonly ConcurrentDictionary<string, IDeserializer> _deserializers = new ConcurrentDictionary<string, IDeserializer>();
    private readonly ISchemaRegistryClient _schemaRegistryClient;

    private static readonly IDictionary<string, Type> Configuration = new Dictionary<string, Type>
    {
        { "topic.A", typeof(TopicA) },
        { "topic.B", typeof(TopicB) },
    };

    public TopicSpecificAvroDeserializer(ISchemaRegistryClient schemaRegistryClient)
    {
        _schemaRegistryClient = schemaRegistryClient ?? throw new ArgumentNullException(nameof(schemaRegistryClient));
    }

    public ISpecificRecord Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        var deserializer = _deserializers.GetOrAdd(context.Topic, CreateDeserializer);
        return (ISpecificRecord)deserializer.Deserialize(data, isNull, context);
    }

    private IDeserializer CreateDeserializer(string topic)
    {
        if (!Configuration.TryGetValue(topic, out var type))
            throw new InvalidOperationException($"Missing message type for topic {topic}");

        var deserializerType = typeof(AvroDeserializer<>).MakeGenericType(type);
        var deserializerInstance = (IAsyncDeserializer)Activator.CreateInstance(deserializerType, _schemaRegistryClient, null);
        if (deserializerInstance == null)
            throw new InvalidOperationException($"Failed to create instance of {typeof(AvroDeserializer<>)} with generic argument {type}");

        return deserializerInstance.AsSyncOverAsync();
    }