LGouellec / streamiz

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/streamiz/
MIT License
470 stars 75 forks source link

Generic value instead of string #157

Closed PabloDeCortes closed 2 years ago

PabloDeCortes commented 2 years ago

Hi,

How can I use your library using generic models for value instead of string type. I didn't find any example in documentation. My code works fine with string generic but it fails when I try to use custom model. Code snippet:

var config = new StreamConfig<StringSerDes, JsonSerDes>
{
    ApplicationId = "test-app",
    BootstrapServers = "localhost:9093",
    AutoOffsetReset = AutoOffsetReset.Earliest,
};

var builder = new StreamBuilder();

var personStream = builder.Stream<string, Person>("persons", new StringSerDes(), new JsonSerDes<Person>());
var locationStream = builder.Stream<string, Location>("locations", new StringSerDes(), new JsonSerDes<Location>());
var jobStream = builder.Stream<string, Job>("jobs", new StringSerDes(), new JsonSerDes<Job>());

personStream
    .SelectKey((k, v) => v.LocationId)
    .Join(locationStream.ToTable(),
        (v1, v2) => new PersonLocation
        {
            Person = v1,
            Location = v2
        })
    .SelectKey((k, v) => v.Person.JobId)
    .Join(jobStream.ToTable(), (v1, v2) => new PersonJobLocation{
        Person = v1.Person,
        Location = v1.Location,
        Job = v2
    })
    .To<StringSerDes, JsonSerDes<PersonJobLocation>>("person-job-location");

var topology = builder.Build();
var stream = new KafkaStream(topology, config);

Console.CancelKeyPress += (_, _) => {
    stream.Dispose();
};

await stream.StartAsync();

Best regards

LGouellec commented 2 years ago

Hi @PabloDeCortes,

Could you post the error logs in this issue please ?

Kr,

PabloDeCortes commented 2 years ago

Error:

fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[test-app-433578a6-4be0-4401-8d7d-bb1985033438-stream-thread-0] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
      Confluent.Kafka.KafkaException: Local: Erroneous state
         at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable`1 partitions, Func`3 assignMethodErr, Func`3 assignMethodError)
         at Confluent.Kafka.Impl.SafeKafkaHandle.Assign(IEnumerable`1 partitions)
         at Confluent.Kafka.Consumer`2.Unassign()
         at Confluent.Kafka.Consumer`2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)
         at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr rk, IntPtr timeout_ms)
         at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
         at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
         at Streamiz.Kafka.Net.Crosscutting.KafkaExtensions.ConsumeRecords[K,V](IConsumer`2 consumer, TimeSpan timeout, Int64 maxRecords)
         at Streamiz.Kafka.Net.Processors.StreamThread.PollRequest(TimeSpan ts)
         at Streamiz.Kafka.Net.Processors.StreamThread.<>c__DisplayClass57_0.<Run>b__0()
         at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action)
         at Streamiz.Kafka.Net.Processors.StreamThread.Run()

Maybe something wrong with my JsonSerDes? Code snippet:

using System.Text;
using Confluent.Kafka;
using Newtonsoft.Json;
using Streamiz.Kafka.Net.SerDes;

namespace KafkaStreams.Serializers;

public class JsonSerDes : ISerDes
{
    public object DeserializeObject(byte[] data, SerializationContext context)
    {
        return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data));
    }

    public byte[] SerializeObject(object data, SerializationContext context)
    {
        return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));
    }

    public void Initialize(SerDesContext context)
    {
    }
}

public class JsonSerDes<T> : ISerDes<T>
{
    public void Initialize(SerDesContext context)
    {
    }

    public object DeserializeObject(byte[] data, SerializationContext context)
    {
        return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data));
    }

    public byte[] SerializeObject(object data, SerializationContext context)
    {
        return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));
    }

    public T Deserialize(byte[] data, SerializationContext context)
    {
        return JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(data));
    }

    public byte[] Serialize(string data, SerializationContext context)
    {
        throw new NotImplementedException();
    }

    public byte[] Serialize(T data, SerializationContext context)
    {
        return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));
    }
}
LGouellec commented 2 years ago

@PabloDeCortes ,

Do you have exception during deserialization inside your JsonSerDes ?

Error looks happen inside the dotnet Kafka client ?

PabloDeCortes commented 2 years ago

@LGouellec it even do not enter Deserialize method of JsonSerDes. I don't have any Kafka consumer, only Kafka producer to populate topics and it works fine. I created repository with whole structure so you can understand better https://github.com/PabloDeCortes/KafkaStreams

LGouellec commented 2 years ago

@PabloDeCortes, Thanks, I will have a look.

LGouellec commented 2 years ago

@PabloDeCortes : Can you add me on maintener on this repo ? I fix the issue and I want push my code into another branch

PabloDeCortes commented 2 years ago

Yeah, gimme a sec

PabloDeCortes commented 2 years ago

Sent an invite

LGouellec commented 2 years ago

Tracking issue https://github.com/LGouellec/kafka-streams-dotnet/issues/158 for the fix