LGouellec / streamiz

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/streamiz/
MIT License
470 stars 75 forks source link

Providing custom class as a DefaultValueSerDes giving an error #177

Closed Shikha4599 closed 2 years ago

Shikha4599 commented 2 years ago

Description We have made our consumer class where we are consuming messages from Kafka producer. We wanted to customize our Kafka stream by providing customized class in DefaultValueSerDes, which is not working and giving me error.

THIS IS MY PROGRAM.CS FILE ` // Stream configuration var config = new StreamConfig();

    config.ApplicationId = "app-testing";
    config.BootstrapServers = "localhost:9092";
    config.DefaultKeySerDes = new StringSerDes();
    config.DefaultValueSerDes = new ABC();

    StreamBuilder builder = new StreamBuilder();

    IKStream<string, ABC> str = builder.Stream<string, ABC>("test-input");
    str.Filter((k, v) => v.Data >= 25 && v.Data <= 50).To("test-output");

    Topology t = builder.Build();

    // Create a stream instance with topology and configuration
    KafkaStream stream = new KafkaStream(t, config);

    // Subscribe CTRL + C to quit stream application
    Console.CancelKeyPress += (o, e) =>
    {
        stream.Dispose();
    };

    // Start stream instance with cancellable token
    await stream.StartAsync();  `

THIS IS MY PROVIDED JSON TYPE IN KAFKA PRODUCER {"Name": "DND", "Data": 37}

THIS IS MY CUSTOMIZED CLASS ABC.CS ` public class ABC : ISerDes { public string Name { get; set; } public int Data { get; set; }

    public object DeserializeObject(byte[] data, SerializationContext context)
    {
        var bytesAsString = Encoding.UTF8.GetString(data);
        return JsonConvert.DeserializeObject<ABC>(bytesAsString);
    }

    public void Initialize(SerDesContext context)
    {
        context.Config.BootstrapServers = "localhost:9092";
        context.Config.ApplicationId = "app-testing";
    }

    public byte[] SerializeObject(object data, SerializationContext context)
    {
        var a = JsonConvert.SerializeObject(data);
        return Encoding.UTF8.GetBytes(a);
    }
}`

THIS IS THE ERROR WE ARE RECEIVING

kafka

How to reproduce

Checklist

Please provide the following information:

LGouellec commented 2 years ago

Hi @Shikha4599,

Thanks for your issue.

Could you please test this implementation for ABC.cs :

    public class ABC : AbstractSerDes<ABC>
    {
        public string Name { get; set; }
        public int Data { get; set; }

        public override ABC Deserialize(byte[] data, SerializationContext context)
        {
            var bytesAsString = Encoding.UTF8.GetString(data);
            return JsonConvert.DeserializeObject<ABC>(bytesAsString);
        }

        public override byte[] Serialize(ABC data, SerializationContext context)
        {
            var a = JsonConvert.SerializeObject(data);
            return Encoding.UTF8.GetBytes(a);
        }
    }
LGouellec commented 2 years ago

By the way, it's not a good practice to use the same class for the BEAN and the SerDes. I recommend you to split into two different classes.

LGouellec commented 2 years ago

@Shikha4599,

Looks like your value in the source kafka topic is null. If not, maybe you have to add a breakpoint into the ABC serdes in the Deserialize(..)method to observe the behavior during the deserialiazation ?

OneCricketeer commented 2 years ago

Your filter could have v!=null && v.Data...

Shikha4599 commented 2 years ago

Hi @Shikha4599,

Thanks for your issue.

Could you please test this implementation for ABC.cs :

    public class ABC : AbstractSerDes<ABC>
    {
        public string Name { get; set; }
        public int Data { get; set; }

        public override ABC Deserialize(byte[] data, SerializationContext context)
        {
            var bytesAsString = Encoding.UTF8.GetString(data);
            return JsonConvert.DeserializeObject<ABC>(bytesAsString);
        }

        public override byte[] Serialize(ABC data, SerializationContext context)
        {
            var a = JsonConvert.SerializeObject(data);
            return Encoding.UTF8.GetBytes(a);
        }
    }

This worked. Thankyou so much for the response @LGouellec

LGouellec commented 2 years ago

@Shikha4599

Perfect ! So I close this issue. If you have another question/issue, please open a new one.