Open zmccoy opened 4 years ago
I'm poking around at this right now.
Does any of this code help: https://github.com/fd4s/fs2-kafka/tree/master/modules/vulcan/src/main/scala/fs2/kafka/vulcan ?
@LukaJCB It does! What isn't great is that that I think we'd need essentially that code's abstractions to make this usable. At this moment I feel like I am working on a poor man's copy of that. Each time I've tried to trim an inner class from that I understand why they did it heh. Slowly working on a mvp.
I'm struggling with if we want to do pretty much exactly what they have so that you can create your schema registry and settings safely etc with KafkaStreams4s
or would we rely on someone using both libraries together and only using the Ser/De portions. I can easily see applications only using KafkaStreams4s
and not needing consumer/producer semantics outside of the streams api. This latter case then would rely on the user wrapping all of that ceremony up.
maybe this could be moved to a module in vulcan itself? Not sure what the best coa is here, maybe we should create an issue in vulcan or fs2-kafka to see what they think
Yeah, I think that's the best choice to keep things without possible regression. I can make an issue on fs2-kafka and tie it back here today. I'm going to keep making a small example of what they're doing to see how it could tie into KafkaStreams4s along with working with both Avro implementations.
One thing I'm seeing so far is knowing whether a ser/de we are going to use is for a key or not which would change the inner kafka ser/de used which in KafkaStreams4s we'll need to know or "pin" which one ends up being used.
Got a minimal version of what the fs2-kafka-encoding of this looks like going here: https://github.com/compstak/KafkaStreams4s/compare/master...zmccoy:kafka-avro-format?expand=1#diff-33e9d5e2212f25b9d29a6a7e756f130dR54-R55
I have some questions noted there as to how to tie the serialize
and deserialize
methods on the corresponding classes into how the streams portion works along with making sure the key serialization/deserialization is being used, hopefully implicitly, for the keys only.
Before we made an issue into fs2-kafka I wanted to make sure what we'd like in the end is possible without something snagging.
Just wanted to put an update on here: I've gotten quite a bit farther than what is above, but the ergonomics of its usage right now while keeping the key and values separate internally make it a bit of a pain to use. Kafka-summit is this coming week so I won't get back to it for a few days. (https://github.com/compstak/KafkaStreams4s/compare/master...zmccoy:kafka-avro-format?expand=1). The design leaves a lot to be desired at this moment as it doesn't hide enough information away from the interfaces.
The fix for this is going to need to wait for: https://github.com/compstak/KafkaStreams4s/pull/50 and then I should be able to open some stuff up :)
Hey, my team could really use this feature with our streams/vulcan integration, any way I could help out getting this finished?
@keirlawson Hey! I have it partially completed if you want to take that branch over, I got pulled off of doing this at $work and never got back around to it, which is totally my bad. The current portion of it is here: https://github.com/compstak/KafkaStreams4s/compare/master...zmccoy:kafka-avro-format?expand=1.
Ok, will take a look, any pointers as to what is outstanding from your perspective?
Getting https://github.com/compstak/KafkaStreams4s/pull/50, in master now, merged into that branch which then allows for us to have our keys and values have different SerDes. A lot of that should be following the types. I'll need to dive back into it and see where I was at for a bit to have a better outline of what's really left. The ergonomics of usage was after the portion of allowing keys and values separate SerDes and using them.
Apologies, I never got round to implementing this and our team have now moved away from Streams
Our current codecs for Avro are purely for the Avro bytes format, but KafkaStreams4s should be able to read / write the wire format https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format as to be able to work in a normal kafka system. I haven't given a ton of thought to the abstractions in this case, but wanted to have this issue out there.