Open ankon opened 5 years ago
I imagine the changes required for this would be pretty similar to #373, as both basically require the consumer being able to deal with changing assignments (wether manually assigned or received through subscription). Good thing to keep in mind and prevent a duplication of effort :)
I could probably share our piece of code of handling this, but it's likely easier to just describe it :)
We have a small wrapper around per-topic consumers sharing a single KafkaJS consumer reference. Changes to subscriptions are handled by enqueuing a request for that ('subscribe' + topic/'unsubscribe' + topic), and then accumulating these requests for 0.5s (smaller time than the cost for re-joining a consumer, basically, and in our case of discovering topics long enough to catch them all). When processing the requests we unconditionally stop the old consumer, use subscribe
to add the new subscriptions, and then start the consumer again.
@ankon just for my understanding, would it be fair to summarise your usecase as the pooling of consumers?
@JaapRood yes, that's a fair description.
@JaapRood I am interested in this as well. We have a similar application on which topics are discovered & left over time.
This is done with a large number of channels that are frequently changing, creating a consumer for each is a bit of a concern.
The following would be great to see:
.subscribe()
to be called on running consumer..unsubscribe()
method to allow removing channels from a running consumer.This feels similar to kafka-node
consumer .addTopics()
/.removeTopics()
methods. Unfortunately, after a lot of trial and error, I wasn't able to get kafka-node
consumers to work at all, which is what led me to investigate this library.
This library works great and has a very ergonomic API by comparison, but this feels like a significant feature gap for our use case.
Looking at #373, it looks like the two initiatives have some overlap in the work involved, but since our topics have only one partition, it does not solve our problem on its own. Has there been any progress on that recently? I would not consider myself an expert with Kafka, but if there is anything I can lend a hand with then let me know and I'll see what I can do.
@lukaswelinder unfortunately, I haven't seen any movement on this so far. I've just started a new job that hasn't have me touching Kafka (yet), so it's tricky to find the time. Happy to help out anyone who gives it a go, though!
We'd love this feature to be available. Very recently we had to make a workaround to obtain this result (very dirty workaround).
Hey guys! Any news on this matter? This would be a must-have for my project... anyone with a path to follow?
Whats happening with that ? How can we stop consuming messages from a topic once its already consuming ?
hi, is there an update on this topic?
Based on a short slack exchange (https://kafkajs.slack.com/archives/CF6RFPF6K/p1565968461032400), it seems desirable to change the subscriptions of a running consumer.
In our case we want do that because: a) for historic reasons we share the consumer group between otherwise independent topics that are only discovered over time b) it should require less resources (connections, buffers, ...) overall