apache / rocketmq-flink

RocketMQ integration for Apache Flink. This module includes the RocketMQ source and sink that allows a flink job to either write messages into a topic or read from topics in a flink job.
https://rocketmq.apache.org/
Apache License 2.0
139 stars 88 forks source link

[Bug] unbalanced assignment caused by hash collision in ConsistentHashAllocateStrategy.java #113

Open humkum opened 3 months ago

humkum commented 3 months ago

In current consistent hash allocate strategy, we will get unbalanced assignment caused by hash collision. For example, we have a topic named TopicTest, the topic allocate at 3 brokers: broker0, broker1, broker2, 10 queues per broker, and we have 32 task managers. The allocation result using the ConsistentHashAllocateStrategy as follows:

{7=MessageQueue [topic=TopicTest, brokerName=broker2, queueId=0], 8=MessageQueue [topic=TopicTest, brokerName=broker2, queueId=1], 9=MessageQueue [topic=TopicTest, brokerName=broker2, queueId=2], 10=MessageQueue [topic=TopicTest, brokerName=broker2, queueId=3], 11=MessageQueue [topic=TopicTest, brokerName=broker2, queueId=4], 12=MessageQueue [topic=TopicTest, brokerName=broker2, queueId=5], 13=MessageQueue [topic=TopicTest, brokerName=broker2, queueId=6], 14=MessageQueue [topic=TopicTest, brokerName=broker2, queueId=7], 15=MessageQueue [topic=TopicTest, brokerName=broker2, queueId=8], 16=MessageQueue [topic=TopicTest, brokerName=broker2, queueId=9], 17=MessageQueue [topic=TopicTest, brokerName=broker1, queueId=9], 18=MessageQueue [topic=TopicTest, brokerName=broker0, queueId=9]}
allocation details, format is taskIndex=allocate num:
{7=1, 8=2, 9=3, 10=3, 11=3, 12=3, 13=3, 14=3, 15=3, 16=3, 17=2, 18=1}

We have 32 task manager, but only 12 task manager can work normally, which means we have 20 wasted task manager.

We should resolve the hash collision, I'm glad for your suggestion.

lizhimins commented 3 months ago

可以修改或者增加策略,也可以像 rocketmq 中 (https://github.com/apache/rocketmq) 的 org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely。 如果用了类似 avg 或者 circle 的策略,需要额外处理下分区减少的情况,采用一致性 hash 基本只需要考虑增加分区。