LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
455 stars 74 forks source link

Join operation is not working after update to 1.2.0 #113

Closed alexandraprisecaru closed 2 years ago

alexandraprisecaru commented 2 years ago

Hello,

We are currently using the Streamiz package in our project. We are using it mainly for joining streams in a .NET 6 microservice.

I updated to the latest package version to benefit from the Schema Registry integration, but the join operation is no longer working.

Initially, it was failing because it didn't have a replication factor of 3. I set that in the stream config and then the console logs looked fine, but the join operation was not applied.

I set the log level to debug and attached the logs here. Were there any major changes between the two versions in this area? streamiz_update_1.2.0_logs.txt

Also, I noticed that after the update it auto-creates topics. It also adds additional topics for other operations such as SelectKey. image

This didn't happen before. Is there any way we could disable this, but keep the code up & running? I set the AllowAutoCreateTopics property to false, but it didn't help.

Any help is much appreciated! :) Thanks!

alexandraprisecaru commented 2 years ago

One more question, sorry for the trouble.

I noticed that if I use the SchemaAvroSerDes I am forced to use a value type or an object that implements ISpecificRecord or inherits SpecificFixed. This means, that for my models I need to attach the Schema even though I am communicating with Schema Registry. image

In my implementation of SerDes I skipped this step by using Chr.Avro.Confluent serializer & deserializer. image

Would it be possible for the Streamiz SerDes to have this integrated somehow at one point? I think it would be very useful to rely only on Schema Registry for data governance.

Thanks!

LGouellec commented 2 years ago

Hello,

We are currently using the Streamiz package in our project. We are using it mainly for joining streams in a .NET 6 microservice.

I updated to the latest package version to benefit from the Schema Registry integration, but the join operation is no longer working.

Initially, it was failing because it didn't have a replication factor of 3. I set that in the stream config and then the console logs looked fine, but the join operation was not applied.

I set the log level to debug and attached the logs here. Were there any major changes between the two versions in this area? streamiz_update_1.2.0_logs.txt

Also, I noticed that after the update it auto-creates topics. It also adds additional topics for other operations such as SelectKey. image

This didn't happen before. Is there any way we could disable this, but keep the code up & running? I set the AllowAutoCreateTopics property to false, but it didn't help.

Any help is much appreciated! :) Thanks!

Hi @alexandraprisecaru,

Since 1.2.0 release, we backup local state store with a changelog topic. The same behavior exist with Kafka Streams JAVA to persist the state store on a topic in case of your local instance crash. For more information, see here

Unfortunately, there is a fatal bug on 1.2.0 about infinite restoration from changelog topic. This bug is track here. Please use 1.2.2 release to avoid any bug on this part.

Regards,

LGouellec commented 2 years ago

Yes, my SchemaAvroSerDes implementation needs a ISpecificRecord. Maybe need another implementation with a different class to interact Schema Registry without ISpecificRecord instance. If you want create a PR, feel free.

Thanks in advance. ++

alexandraprisecaru commented 2 years ago

Hi @LGouellec , thanks for responding so fast! 🚀 Thanks for the info, it was very helpful!

Sorry, I forgot to mention that it stopped working for me from the 1.2.0 version, but I've mainly been testing with 1.2.2 and it's still not working for me.

I cloned the repo and applied my changes for serialization and deserialization using Chr.Avro and that seems to work fine. However, I identified a place where it crashes.

I have a small wrapper over the Join function as follows: image

When I don't pass the join props variable, it doesn't crash, but it doesn't join either.

When I pass the join props it fails at serialization, because the SerDes is not initialized ("SchemaSerDes<{typeof(T).Name}> is not initialized !") This happens when the serializer is called from the WrappedWindowStore class. image

I noticed that in the join process it initializes the serdes, but not all of them I think.. image image

I'm still not very familiar with the code, but I'll try to find a solution.

Any help is much appreciated 😃

alexandraprisecaru commented 2 years ago

I tried initializing the key serdes and value serdes from the WrappedWindowStore constructor using the correct config just for testing. Now it doesn't fail because of initialization errors and serialization and deserialization are called every time I am sending data on the topics I want to join. I get good data on serialization and deseriailzation.

Snippets from the SchemaSerDes class: image

image

However, the join result still doesn't reach my end topic...

LGouellec commented 2 years ago

Hi @alexandraprisecaru,

I create a reproducer about your usecase. Join 2 topics using Avro serdes in value and it works fine. Join process is correct and publish records into the sink topic.

You can run this demo using start.sh.

Can you just forks my repo and create your own branch with your fix (modification) to see how is the fix ? where is the fix and so on ...

Thanks for your help 👍 🥇

Regards,