LGouellec / streamiz

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/streamiz/
MIT License
470 stars 75 forks source link

Change ValueSerDes before sending into output topic #202

Closed Shikha4599 closed 2 years ago

Shikha4599 commented 2 years ago

Description

I want to customize ValueSerDes before sending Kafka stream to another output topic. But I am receiving error and not able to do so.

public class KafkaConsumer
    {
        private static IDictionary<string, List<PropDto>> availableAssets = new Dictionary<string, List<PropDto>>();
        public async Task ConsumerAsync()
        {
            try
            {
                string json1 = File.ReadAllText("abc.json");
                var assets = JsonConvert.DeserializeObject<List<AsDto>>(json1);

                foreach (var item in assets)
                {
                    availableAssets.Add(item.AssetIdentifier, (List<PropDto>)item.AssetProperties);
                }

                // Stream configuration
                var config = new StreamConfig();
                config.ApplicationId = "testing-acc4";
                config.BootstrapServers = "localhost:9092";
                config.DefaultKeySerDes = new StringSerDes();
                config.DefaultValueSerDes = new AssetDataDto();
                //config.DefaultValueSerDes = new AlertsDataDto();

                StreamBuilder builder = new StreamBuilder();

                IKStream<string, AssetDataDto> str = builder.Stream<string, AssetDataDto>("test-acc4");
                IKStream<string, AssetDataDto> str1 = str.Filter((k, v) => CheckFilterCriteria(v));
                IKStream<string, AlertsDataDto> str2 = str1.MapValues((k, v) => CheckFilter(v));
                str2.To("test-acc4-output");

                Topology t = builder.Build();

                // Create a stream instance with topology and configuration
                KafkaStream stream = new KafkaStream(t, config);

                //Subscribe CTRL + C to quit stream application
                Console.CancelKeyPress += (o, e) =>
                {
                    stream.Dispose();
                };

                await stream.StartAsync();
            }
            catch(Exception ex)
            {
                Console.WriteLine(ex.Message);
            }

        }

        static double? min;
        static double? max;
        private static AlertsDataDto CheckFilter(AssetDataDto asset)
        {

            AlertsDataDto alertsData = new()
            {
                MinValue = min,
                MaxValue = max,
            };
            return alertsData;
        }

        private static bool CheckFilterCriteria(AssetDataDto asset)
        {
            if (availableAssets.ContainsKey(asset.Identifier))
            {
                foreach (var item in availableAssets[asset.Identifier])
                {
                    if (item.AssetPropertyIdentifier == asset.AssetPropertyIdentifier)
                    {
                        min = item.MinBoundryValue;
                        max = item.MaxBoundryValue;
                        return true;

                    }
                }
            }
            return false;
        }
    }

This is my AssetDataDto class:

public class AssetDataDto : AbstractSerDes<AssetDataDto>
    {
        public string AssetIdentifier { get; set; }
        public string AssetPropertyIdentifier { get; set; }
        public double Value { get; set; }
        public DateTime TimeStamp { get; set; }

        public override AssetDataDto Deserialize(byte[] data, SerializationContext context)
        {
            var bytesAsString = Encoding.UTF8.GetString(data);
            return JsonConvert.DeserializeObject<AssetDataDto>(bytesAsString);
        }
        public override byte[] Serialize(AssetDataDto data, SerializationContext context)
        {
            var a = JsonConvert.SerializeObject(data);
            return Encoding.UTF8.GetBytes(a);
        }
    }

This is my AlertsDataDto class:

public class AlertsDataDto : AbstractSerDes<AlertsDataDto>
    {
        public double? MinValue { get; set; }
        public double? MaxValue { get; set; }

        public override AlertsDataDto Deserialize(byte[] data, SerializationContext context)
        {
            var bytesAsString = Encoding.UTF8.GetString(data);
            return JsonConvert.DeserializeObject<AlertsDataDto>(bytesAsString);
        }
        public override byte[] Serialize(AlertsDataDto data, SerializationContext context)
        {
            var a = JsonConvert.SerializeObject(data);
            return Encoding.UTF8.GetBytes(a);
        }
    }

This is the error:

topic1

How to reproduce

Checklist

Please provide the following information:

duke-bartholomew commented 2 years ago

Hi @Shikha4599 You configured your topology with default value serde: new AssetDataDto() In your pipeline you change from AssetDataDto to AlertsDataDto (in stream: str2) So I guess you need to explicitily define your serde in the To method (because it's not defined so KStreams will try to use the default one, which is AssetDataDto and not AlertsDataDto

Try using: str2.To("test-acc4-output", new StringSerDes(), new AlertsDataDto())

LGouellec commented 2 years ago

Hi @Shikha4599 ,

@duke-bartholomew say right !

Btw, it's highly recommended to split in two different classed DTO and DTO's Serdes.

Best regards,

Shikha4599 commented 2 years ago

Hi @Shikha4599 ,

@duke-bartholomew say right !

Btw, it's highly recommended to split in two different classed DTO and DTO's Serdes.

Best regards,

Hi @LGouellec

Where exactly split is required?

duke-bartholomew commented 2 years ago

Hi @Shikha4599 , @duke-bartholomew say right ! Btw, it's highly recommended to split in two different classed DTO and DTO's Serdes. Best regards,

Hi @LGouellec

Where exactly split is required?

Hi @Shikha4599 I think what @LGouellec means is to split your data and the serde logic in different classes. The Serialization/Deserialization logic is not bound to an instance of your data. It's just functionality that can be used to operate on your data objects.

KafkaStreams will just instanciate an instance of the Serde class to use for deserializing/serializing objects of that specific type. It does not care about the data embedded in that object. In your case, you will also initialize whatever resources needed by the base Serdes class whenever you create one of your data objects, which is not something you really want.

So, long story short:

public class AssetDataDto {
        public string AssetIdentifier { get; set; }
        public string AssetPropertyIdentifier { get; set; }
        public double Value { get; set; }
        public DateTime TimeStamp { get; set; }
    }

and

public class AssetDataDtoSerde : AbstractSerDes<AssetDataDto>  {
        public override AssetDataDto Deserialize(byte[] data, SerializationContext context) {
            var bytesAsString = Encoding.UTF8.GetString(data);
            return JsonConvert.DeserializeObject<AssetDataDto>(bytesAsString);
        }
        public override byte[] Serialize(AssetDataDto data, SerializationContext context) {
            var a = JsonConvert.SerializeObject(data);
            return Encoding.UTF8.GetBytes(a);
        }
    }

Also keep in mind that Kafka serdes should also be able to cope with 'null' values (in some cases). Deleting a message from a log-compacted topic for instance is done by publishing a 'null' value on a specific key, so the serializer used for that should be able to handle 'null' values. But that's also a bit up to how the serdes are used in your business logic and topology ...

Shikha4599 commented 2 years ago

Hi @LGouellec, @duke-bartholomew

Given solution and suggestions worked for me. Thankyou for your time and help.

Best Regards

LGouellec commented 2 years ago

@Shikha4599 ,

Btw, if you upgrade to 1.4.0-RC3, you have a Streamiz.Kafka.Net.SerDes.JsonSerDes include in the library which use Newtonsoft and behave about tombstone and so on ...