faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.6k stars 180 forks source link

Add support for RoundRobinAssignor #511

Open alexakra opened 1 year ago

alexakra commented 1 year ago

Checklist

Expected behavior

I would like to use RoundRobinAssignor to distribute available partitions evenly across all consumers.

Actual behavior

Today Faust groups the same partition numbers onto the same consumers for all topics with the same number of partitions.

Versions

wbarnha commented 1 year ago

Faust can support using aiokafka's RoundRobinAssignor ! We have some cases where it uses that by default for cases such as https://github.com/faust-streaming/faust/pull/402.

Try setting app.assignor=RoundRobinPartitionAssignor on startup.

alexakra commented 1 year ago

It is failing: [ERROR] [^-App]: Crashed reason=AttributeError("type object 'RoundRobinPartitionAssignor' has no attribute 'assigned_standbys'") I think that there is no assignor field on App.

tynianovddi commented 1 year ago

same issue here

dada-engineer commented 11 months ago

Have you tried setting PartitionAssignor when initializing the app? https://faust-streaming.github.io/faust/userguide/settings.html#partitionassignor

alexakra commented 10 months ago

Have you tried setting PartitionAssignor when initializing the app? https://faust-streaming.github.io/faust/userguide/settings.html#partitionassignor

Still failing: app = App(..., PartitionAssignor=RoundRobinPartitionAssignor)

File "../lib/python3.11/site-packages/mode/services.py", line 807, in _default_start await self._actually_start() File "../lib/python3.11/site-packages/mode/services.py", line 824, in _actually_start await self.on_start() File "../lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 478, in on_start self._consumer = self._create_consumer(loop=self.thread_loop) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "../lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 497, in _create_consumer return self._create_worker_consumer(transport) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "../lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 507, in _create_worker_consumer self.app.assignor File "../lib/python3.11/site-packages/mode/utils/objects.py", line 659, in get value = obj.dict[self.name] = self.__get(obj) ^^^^^^^^^^^^^^^ File "../lib/python3.11/site-packages/faust/app/base.py", line 2069, in assignor assignor = self.conf.PartitionAssignor( # type: ignore ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ TypeError: RoundRobinPartitionAssignor() takes no arguments

dada-engineer commented 10 months ago

This is weird as faust is also setting this assignor in a case where table standby replicas is set to zero. If you do not need them can you try to set the setting TABLE_STANDBY_REPLICAS to zero?

If not you would need to build your own class that implements the interface of fausts PartitionAssignorT type

alexakra commented 10 months ago

I did and it works. But I opened this issue to solve it properly and not looking for a workaround because I cannot rely on it.

dada-engineer commented 10 months ago

I do not see an issue. faust says if you want to assign a PartitionAssignor your own you need to provide a PartitonAssignorT compatible class. RoundRobinPartitioner clearly isn't as it has no init function accepting the provided args of faust.

Whats the issue in summary then? 🤔

alexakra commented 10 months ago

My thoughts:

  1. Faust internally uses RoundRobinPartitioner which is not PartitonAssignorT compatible class. Why is it so?
  2. This is a common use case where many would like to use the standard implementation. I don't need a custom one, so there is no need that each of us will implement custom RoundRobinPartitioner.