LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
453 stars 73 forks source link

Global K Table broken #212

Closed lukasswm closed 1 year ago

lukasswm commented 1 year ago

Hi :)

first at all thanks for your great work! :)

After Updating to Version 1.4.0 LeftJoin with GlobalKTable seems to be broken with following exception: With Version 1.3.2 everything is fine.

[15:51:26 INF] Restoration took 71ms for all tasks 1-0 [15:51:52 ERR] Failed to process stream task 1-0 due to the following error: System.ArgumentOutOfRangeException: Non-negative number required. (Parameter 'count') at System.IO.BinaryReader.ReadBytes(Int32 count) at Streamiz.Kafka.Net.SerDes.ValueAndTimestampSerDes.Extract(Byte[] data) at Streamiz.Kafka.Net.SerDes.ValueAndTimestampSerDes1.Deserialize(Byte[] data, SerializationContext context) at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore2.FromValue(Byte[] values) at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore2.<>c__DisplayClass18_0.<Get>b__0() at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency[T](Func1 actionToMeasure, Sensor sensor) at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore2.Get(K key) at Streamiz.Kafka.Net.Table.Internal.KTableSourceValueGetterSupplier2.KTableSourceValueGetter.Get(K key) at Streamiz.Kafka.Net.Processors.KStreamKTableJoinProcessor5.Process(K1 key, V1 value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.<>cDisplayClass39_0.b0(IProcessor genericProcessor) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(IEnumerable1 processors, Action1 action) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(K key, V value) at Streamiz.Kafka.Net.Processors.SourceProcessor2.Process(K key, V value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(Object key, Object value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(ConsumeResult2 record) at Streamiz.Kafka.Net.Processors.StreamTask.<>cDisplayClass49_0.b0() at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) at Streamiz.Kafka.Net.Processors.StreamTask.Process() at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now) [15:51:52 ERR] stream-thread[xxx.service-0303d3be-02eb-4254-96b1-819d81a45df4-stream-thread-0] Encountered the following error during processing: System.ArgumentOutOfRangeException: Non-negative number required. (Parameter 'count') at System.IO.BinaryReader.ReadBytes(Int32 count) at Streamiz.Kafka.Net.SerDes.ValueAndTimestampSerDes.Extract(Byte[] data) at Streamiz.Kafka.Net.SerDes.ValueAndTimestampSerDes1.Deserialize(Byte[] data, SerializationContext context) at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore2.FromValue(Byte[] values) at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore2.<>c__DisplayClass18_0.<Get>b__0() at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency[T](Func1 actionToMeasure, Sensor sensor) at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore2.Get(K key) at Streamiz.Kafka.Net.Table.Internal.KTableSourceValueGetterSupplier2.KTableSourceValueGetter.Get(K key) at Streamiz.Kafka.Net.Processors.KStreamKTableJoinProcessor5.Process(K1 key, V1 value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.<>cDisplayClass39_0.b0(IProcessor genericProcessor) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(IEnumerable1 processors, Action1 action) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(K key, V value) at Streamiz.Kafka.Net.Processors.SourceProcessor2.Process(K key, V value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(Object key, Object value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(ConsumeResult2 record) at Streamiz.Kafka.Net.Processors.StreamTask.<>cDisplayClass49_0.b0() at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) at Streamiz.Kafka.Net.Processors.StreamTask.Process() at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now) at Streamiz.Kafka.Net.Processors.StreamThread.<>cDisplayClass58_0.b3() at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) at Streamiz.Kafka.Net.Processors.StreamThread.Run() [15:51:52 ERR] an kafka streams inner error occured. System.ArgumentOutOfRangeException: Non-negative number required. (Parameter 'count') at System.IO.BinaryReader.ReadBytes(Int32 count) at Streamiz.Kafka.Net.SerDes.ValueAndTimestampSerDes.Extract(Byte[] data) at Streamiz.Kafka.Net.SerDes.ValueAndTimestampSerDes1.Deserialize(Byte[] data, SerializationContext context) at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore2.FromValue(Byte[] values) at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore`2.<>cDisplayClass18_0.b0() at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency[T](Func1 actionToMeasure, Sensor sensor) at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore2.Get(K key) at Streamiz.Kafka.Net.Table.Internal.KTableSourceValueGetterSupplier2.KTableSourceValueGetter.Get(K key) at Streamiz.Kafka.Net.Processors.KStreamKTableJoinProcessor5.Process(K1 key, V1 value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.<>c__DisplayClass39_0.<Forward>b__0(IProcessor genericProcessor) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(IEnumerable1 processors, Action1 action) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(K key, V value) at Streamiz.Kafka.Net.Processors.SourceProcessor2.Process(K key, V value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(Object key, Object value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(ConsumeResult`2 record) at Streamiz.Kafka.Net.Processors.StreamTask.<>cDisplayClass49_0.b0() at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) at Streamiz.Kafka.Net.Processors.StreamTask.Process() at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now) at Streamiz.Kafka.Net.Processors.StreamThread.<>cDisplayClass58_0.b3() at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) at Streamiz.Kafka.Net.Processors.StreamThread.Run() [15:51:52 WRN] stream-thread[xxx.service-0303d3be-02eb-4254-96b1-819d81a45df4-stream-thread-0] Detected that the thread throw an inner exception. Your configuration manager has decided to continue running stream processing. So will c lose out all assigned tasks and rejoin the consumer group

How to reproduce

the following topology is used:

string ValueJoiner(Tag currentValue, string metaValue)
{
    var tag = new TagInfo
    {
        Tag = currentValue,
        MetaData = metaValue == null ? null : JsonSerializer.Deserialize<MetaData>(metaValue, _jsonSerializerOptions)
    };

    return JsonSerializer.Serialize(tag, _jsonSerializerOptions);
}

string KeyMapping(string key, Tag tag)
{
    return key;
}

var builder = new StreamBuilder();

var globalKTable = builder.GlobalTable(_options.Value.MetaTopic, InMemory.As<string, string>("table-store"));
var kStream = builder.Stream<string, Tag, SchemaAvroSerDes<string>, SchemaAvroSerDes<Tag>>(_options.Value.ValueTopic);

var targetStream = kStream
    .LeftJoin(globalKTable, KeyMapping, ValueJoiner);

targetStream.To<StringSerDes, StringSerDes>(_options.Value.TargetTopic);

return builder.Build();

any idea what's the reason for that?

Thanks!

LGouellec commented 1 year ago

Hi @lukasswm,

Is it any change that the topic _options.Value.MetaTopic will published by a JAVA producer ?

Best regards,

lukasswm commented 1 year ago

in my current development Setup I use AKHQ to produce messages to that topic (https://github.com/tchiotludo/akhq). It seems to be a Java solution.

having a look into the bytes (data) from line two in the stack (at Streamiz.Kafka.Net.SerDes.ValueAndTimestampSerDes.Extract(Byte[] data)) and encode it with UTF8. I receive an expected json payload. prefixed with some weired stuff.: "\0\0���e\0\0�{\r\n \"pointtype\": \"Float32\", ...}

seems to be an encoding issue? I guess you are persisting the date before the payload right?

LGouellec commented 1 year ago

@lukasswm I tried to reproduce with a unit test in full .NET but I haven't error :

    public class Issue212Tests
    {
        private static readonly JsonSerializerOptions _jsonSerializerOptions = new();

        public class Tag
        {
            public string Field1 { get; set; }
            public string Field2 { get; set; }
            public string Field3 { get; set; }
        }

        public class Metadata
        {
            public Dictionary<string, string> Data { get; set; }
        }

        public class TagInfo
        {
            public Tag Tag { get; set; }
            public Metadata MetaData { get; set; }
        }

        string ValueJoiner(Tag currentValue, string metaValue)
        {
            var tag = new TagInfo
            {
                Tag = currentValue,
                MetaData = metaValue == null
                    ? null
                    : JsonSerializer.Deserialize<Metadata>(metaValue, _jsonSerializerOptions)
            };

            return JsonSerializer.Serialize(tag, _jsonSerializerOptions);
        }

        string KeyMapping(string key, Tag tag)
        {
            return key;
        }

        [Test]
        public void Reproducer()
        {
            var streamConfig = new StreamConfig<StringSerDes, StringSerDes>();
            streamConfig.ApplicationId = "test-reproducer-issue212";

            var builder = new StreamBuilder();

            var globalKTable = builder.GlobalTable("meta", InMemory.As<string, string>("table-store"));
            var kStream = builder.Stream<string, Tag, StringSerDes, JsonSerDes<Tag>>("stream");

            var targetStream = kStream
                .LeftJoin(globalKTable, KeyMapping, ValueJoiner);

            targetStream.To<StringSerDes, StringSerDes>("target");

            var topology = builder.Build();

            using (var driver = new TopologyTestDriver(topology, streamConfig))
            {
                var globalTopic = driver.CreateInputTopic<string, string>("meta");
                var inputTopic = driver.CreateInputTopic<string, Tag, StringSerDes, JsonSerDes<Tag>>("stream");
                var result = driver.CreateOuputTopic<string, string>("target");

                globalTopic.PipeInput("key1", "{\"Data\":{\"key1\":\"value1\",\"key2\":\"value2\"}}");
                inputTopic.PipeInput("key1", new Tag() {Field1 = "tag1", Field2 = "tag2", Field3 = "tag3"});

                var records = result.ReadValueList();
            }
        }
    }
}

I will try with AKHQ.

LGouellec commented 1 year ago

@lukasswm I tried to publish with an AKHQ instance (in meta and stream topic) without issues.

    internal class Program
    {
        private static readonly JsonSerializerOptions _jsonSerializerOptions = new();

        public class Tag
        {
            public string Field1 { get; set; }
            public string Field2 { get; set; }
            public string Field3 { get; set; }
        }

        public class Metadata
        {
            public Dictionary<string, string> Data { get; set; }
        }

        public class TagInfo
        {
            public Tag Tag { get; set; }
            public Metadata MetaData { get; set; }
        }

        public static async Task Main(string[] args)
        {
            string KeyMapping(string key, Tag tag)
            {
                return key;
            }

            string ValueJoiner(Tag currentValue, string metaValue)
            {
                var tag = new TagInfo
                {
                    Tag = currentValue,
                    MetaData = metaValue == null
                        ? null
                        : JsonSerializer.Deserialize<Metadata>(metaValue, _jsonSerializerOptions)
                };

                return JsonSerializer.Serialize(tag, _jsonSerializerOptions);
            }

            var config = new StreamConfig<StringSerDes, StringSerDes>();
            config.ApplicationId = "test-app-reproducer";
            config.BootstrapServers = "localhost:9092";
            config.AutoOffsetReset = AutoOffsetReset.Earliest;
            config.CommitIntervalMs = 3000;
            config.StateDir = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
            config.Logger = LoggerFactory.Create((b) =>
            {
                b.SetMinimumLevel(LogLevel.Debug);
                b.AddLog4Net();
            });

            StreamBuilder builder = new StreamBuilder();

            var globalKTable = builder.GlobalTable("meta", InMemory.As<string, string>("table-store"));
            var kStream = builder.Stream<string, Tag, StringSerDes, JsonSerDes<Tag>>("stream");

            var targetStream = kStream
                .LeftJoin(globalKTable, KeyMapping, ValueJoiner);

            targetStream.To<StringSerDes, StringSerDes>("target");

            Topology t = builder.Build();
            KafkaStream stream = new KafkaStream(t, config);

            Console.CancelKeyPress += (_, _) => stream.Dispose();

            await stream.StartAsync();
        }
    }

Meta topic : 1 partition Stream topic : 3 partition

Meta record Key : "key1" Value :

{
  "Data": {
    "key1": "value1",
    "key2": "value2"
  }
}

Stream record Key : "key1" Value :

{
  "Field1": "tag1",
  "Field2": "tag2",
  "Field3": "tag3"
}

Target record Key : "key1" Value :

{
  "Tag": {
    "Field1": "tag1",
    "Field2": "tag2",
    "Field3": "tag3"
  },
  "MetaData": {
    "Data": {
      "key1": "value1",
      "key2": "value2"
    }
  }
}

Can you give more context with your use case please ?

LGouellec commented 1 year ago

Your issue is solved with the 1.4.1 release. Be careful, after upgrading, please remove all local state store (meaning rocksdb state store) before restarting your app.

lukasswm commented 1 year ago

ah ok so you could reproduce it. I was still working it out on how to reproduce it consistantly. I would have guesed that there where only issues with specific payloads but not sure yet. But Thanks! I will have a look. :)

LGouellec commented 1 year ago

@lukasswm

No the problem raised from this workflow :

If you try to upgrade in 1.4.1 and apply the same workflow. It works !

lukasswm commented 1 year ago

I Think it looks good now! Thanks a lot! Messing around with big and little Endian is always fun!