LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
455 stars 74 forks source link

Protobuf SerDes support for MockSchemaRegistry #168

Closed DmitryGladky closed 1 year ago

DmitryGladky commented 1 year ago

Description

    <PackageReference Include="Confluent.Kafka" Version="1.9.0" />
    <PackageReference Include="Streamiz.Kafka.Net" Version="1.3.0" />
    <PackageReference Include="Streamiz.Kafka.Net.SchemaRegistry.SerDes" Version="1.3.0" />
    <PackageReference Include="Streamiz.Kafka.Net.SchemaRegistry.SerDes.Protobuf" Version="1.3.0" />
    <PackageReference Include="Streamiz.Kafka.Net.SchemaRegistry.Mock" Version="1.1.5" />

Hi, I am using Proto schemas to communicate with kafka. And I'm facing few problems with MockSchemaRegistry, MockSchemaRegistryClient.GetRegisteredSchemaAsync to be exact.

Here is the message protobuf that causing problems:

syntax = "proto3";

import "confluent/type/decimal.proto";

message Msg {
    confluent.type.Decimal BuggedField = 1;
}

I was able to overcome some of them by creating custom SerDes:

internal class FixedProtobufClient : ISchemaRegistryClient
{
    private readonly ISchemaRegistryClient _client;

    public FixedProtobufClient(ISchemaRegistryClient client)
    {
        _client = client;
    }

    public async Task<RegisteredSchema> LookupSchemaAsync(string subject, Schema schema, bool ignoreDeletedSchemas)
    {
        return await GetLatestSchemaAsync(subject);
    }

    /// All other methods just proxy to _client
}

internal class MockProtoSerDes<T> : SchemaProtobufSerDes<T> where T : class, IMessage<T>, new()
{
    protected override ISchemaRegistryClient GetSchemaRegistryClient(SchemaRegistryConfig config)
    {
        var client = base.GetSchemaRegistryClient(config);
        return new FixedProtobufClient(client);
    }
}

driver.CreateInputTopic<Key, Msg>("test_topic", new MockProtoSerDes<Key>(), new MockProtoSerDes<Msg>());

But internally, library uses ValueAndTimestampSerDes which is not using MockProtoSerDes and in the end hitting unimplemented MockSchemaRegistryClient.LookupSchemaAsync method.

Exception:

System.NotImplementedException
The method or operation is not implemented.
   at Streamiz.Kafka.Net.SchemaRegistry.SerDes.Mock.MockSchemaRegistryClient.LookupSchemaAsync(String subject, Schema schema, Boolean ignoreDeletedSchemas) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes/MockSchemaRegistryClient.cs:line 262
   at Confluent.SchemaRegistry.Serdes.ProtobufSerializer`1.<>c__DisplayClass15_0.<<RegisterOrGetReferences>b__1>d.MoveNext()
--- End of stack trace from previous location ---
   at Confluent.SchemaRegistry.Serdes.ProtobufSerializer`1.RegisterOrGetReferences(FileDescriptor fd, SerializationContext context, Boolean autoRegisterSchema, Boolean skipKnownTypes)
   at Confluent.SchemaRegistry.Serdes.ProtobufSerializer`1.SerializeAsync(T value, SerializationContext context)
   at Confluent.Kafka.SyncOverAsync.SyncOverAsyncSerializer`1.Serialize(T data, SerializationContext context)
   at Streamiz.Kafka.Net.SchemaRegistry.SerDes.SchemaSerDes`2.Serialize(T data, SerializationContext context) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes/SchemaSerDes.cs:line 76
   at Streamiz.Kafka.Net.SerDes.ValueAndTimestampSerDes`1.Serialize(ValueAndTimestamp`1 data, SerializationContext context) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/SerDes/ValueAndTimestampSerDes.cs:line 76
   at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore`2.GetValueBytes(V value) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/State/Metered/MeteredKeyValueStore.cs:line 64
   at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore`2.<>c__DisplayClass24_0.<Put>b__0() in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/State/Metered/MeteredKeyValueStore.cs:line 154
   at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action actionToMeasure, Sensor sensor) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Crosscutting/ActionHelper.cs:line 27
   at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore`2.Put(K key, V value) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/State/Metered/MeteredKeyValueStore.cs:line 154
   at Streamiz.Kafka.Net.Processors.KTableSourceProcessor`2.Process(K key, V value) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Processors/KTableSourceProcessor.cs:line 37
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(K key, V value) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Processors/AbstractProcessor.cs:line 117
   at Streamiz.Kafka.Net.Processors.SourceProcessor`2.Process(K key, V value) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Processors/SourceProcessor.cs:line 60
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Process(Object key, Object value) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Processors/AbstractProcessor.cs:line 236
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Process(ConsumeResult`2 record) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Processors/AbstractProcessor.cs:line 230
   at Streamiz.Kafka.Net.Processors.StreamTask.<>c__DisplayClass48_0.<Process>b__0() in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Processors/StreamTask.cs:line 481
   at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Crosscutting/ActionHelper.cs:line 11
   at Streamiz.Kafka.Net.Processors.StreamTask.Process() in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Processors/StreamTask.cs:line 481
   at Streamiz.Kafka.Net.Mock.Sync.SyncPipeBuilder.StreamTaskPublisher.Flush() in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Mock/Sync/SyncPipeBuilder.cs:line 36
   at Streamiz.Kafka.Net.Mock.Pipes.SyncPipeInput.Flush() in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Mock/Pipes/SyncPipeInput.cs:line 28
   at Streamiz.Kafka.Net.Mock.TestInputTopic`2.PipeInput(TestRecord`2 record) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Mock/TestInputTopic.cs:line 93
   at Streamiz.Kafka.Net.Mock.TestInputTopic`2.PipeInput(K key, V value, Headers headers) in /home/runner/work/kafka-streams-dotnet/kafka-streams-dotnet/core/Mock/TestInputTopic.cs:line 118

Every time, application hits MockSchemaRegistryClient.LookupSchemaAsync, the subject == "confluent/type/decimal.proto" and MockSchemaRegistryClient.schemas has a schema for that type. So, why method LookupSchemaAsync is not checking schemas dictionary?

How to reproduce

Run this unit test

Checklist

Please provide the following information:

LGouellec commented 1 year ago

For now Protobuf SerDes doesn't work with MockSchemaRegistry because this method is not implemented. I add this issue on the 1.4 milestone