kestra-io / plugin-kafka

https://kestra.io/plugins/plugin-kafka/
Apache License 2.0
4 stars 5 forks source link

Kafka Consumer on a topic using a Ktable like pattern #55

Closed brokenjacobs closed 8 months ago

brokenjacobs commented 8 months ago

Issue description

I'm struggling with the best way to handle a log compacted topic in Kestra. I want to consume, but only keep the latest message for a particular key. I'm not finding a good blueprint for how to do this, as everything seems intent on keeping data from every kafka message consumed. Is there anyone that can point me in the right direction. I've even tried reading in the output file with the read() call after serdes'ing to json, but the collection of read messages comes back as a linkedHashMap so you can't call last on it. (It must be iterated). And there is no flatten or list filter in the template language that can be used.

anna-geller commented 8 months ago

Extra info from the team: this will require a new task similar to the Consume task to get all messages from a topic as only 1 message by key if the topic is compacted. It will likely not be easy, but feasible

anna-geller commented 8 months ago

@brokenjacobs, could you say more about what you are trying to achieve?

e.g. if you need to consume a topic using a Trigger and then maintain some kind of key/value state (KTable), then it should be possible to use a flow with the State task.

you can also discuss via https://slack.kestra.io if easier, we have Kafka experts in the team, I'm sure there is some good way to approach it

brokenjacobs commented 8 months ago

nice. good idea, we'll find you all on slack. One thought before I switch over to there though, what if we only care about latest? (auto offset = latest)? Would that make it easier?

brokenjacobs commented 8 months ago

https://slack.kestra.io doesn't resolve. kestra.slack.com seems to be private and I can't join... The ui leads the way though: kestra-io.slack.com

anna-geller commented 8 months ago

sorry, it should be https://kestra.io/slack

One thought before I switch over to there though, what if we only care about latest? (auto offset = latest)? Would that make it easier?

I have no idea, this is why I suggested Slack :) this way I can tag some Kafka expert to answer

brokenjacobs commented 8 months ago

Closing this it's not really a kafka plugin issue