encode / broadcaster

Broadcast channels for async web apps. 📢
BSD 3-Clause "New" or "Revised" License
1.13k stars 121 forks source link

Kafka backend can't unsubscribe from individual channel #65

Open amacfie opened 2 years ago

amacfie commented 2 years ago

Currently, unsubscribing from one Kafka channel unsubscribes from all channels. Based on the aiokafka docs I'm guessing we want to do

self._consumer_channels.remove(channel)
self._consumer.subscribe(topics=self._consumer_channels)
amacfie commented 2 years ago

Also, when we call AIOKafkaConsumer we might want to add auto_offset_reset="latest" based on https://aiokafka.readthedocs.io/en/stable/consumer.html#controlling-the-consumer-s-position. Even then, when we change the topics we're subscribed to it's not obvious to me that we won't miss events or process events multiple times.

tsotnesharvadze commented 1 year ago

Why didn't merge this PR?

tsotnesharvadze commented 1 year ago

Currently, unsubscribing from one Kafka channel unsubscribes from all channels. Based on the aiokafka docs I'm guessing we want to do

self._consumer_channels.remove(channel)
self._consumer.subscribe(topics=self._consumer_channels)

When _counsumer_channels will be empty it raise error, so:

    async def unsubscribe(self, channel: str) -> None:
        self._consumer_channels.remove(channel)
        if self._consumer_channels:
            self._consumer.subscribe(topics=list(self._consumer_channels))
        else:
            self._consumer.unsubscribe()