With all existing Pulsar load balancing algorithms, it is difficult to balance the load of Pulsar cluster nodes. It often happens that some nodes are highly loaded while others are idle, and the CPU of each Broker is very different.
There are three reasons why the existing way will make the cluster unbalanced:
The load managed by each Bundle is not even. Even if the number of partitions managed by each bundle is the same, there is no guarantee that the sum of the loads of these partitions will be the same.
Doesn't shed loads very well. The existing default policy ThresholdShedder has a relatively high usage threshold, and various traffic thresholds need to be set. Many clusters with high TPS and small message bodies may have high CPU but low traffic; And for many small-scale clusters, the threshold needs to be modified according to the actual business.
The removed Bundle cannot be well distributed to other Brokers. The load information of each Broker will be reported at regular intervals, so the judgment of the Leader Broker when allocating Bundles cannot be guaranteed to be completely correct. Secondly, if there are a large number of Bundles to be redistributed, the Leader may make the low-load Broker a new high-load node when the load information is not up-to-date.
Scope
In this PIP, we are trying to solve the first problem.
Goal
Make the partition assignment strategy pluggable, users can customize the assignment strategy according to their own special scenarios.
API Changes
When lookup, partitions will be assigned to bundle:
Consistent hashing is now used to assign partitions to bundle in NamespaceBundles#findBundle.
We should add a configuration item topicBundleAssignmentStrategy, so that different partition assignment strategy can be dynamically configured.
The existing strategy will be used as the default (topicBundleAssignmentStrategy=ConsistentHashingTopicBundleAssigner.class)
Implementation
Add interface TopicBundleAssignmentStrategy
public interface TopicBundleAssignmentStrategy {
NamespaceBundle findBundle(TopicName topicName);
}
Add a factory to create implementations
public class TopicBundleAssignmentFactory {
static TopicBundleAssignmentStrategy create(PulsarAdmin adminClient, ServiceConfigration config, NamespaceService namespaceService) {
//....
};
}
NamespaceService already has bundleOwnershipListeners, we need to add a bundleSplitListener
a. Trigger listeners when the bundle is splitted
b. A strategy that needs to perceive changes in the Bundle will register the corresponding listener
c. When the NamespaceBundles is initialized, the implementation class will be created through the factory class
Implementation for demonstration:
Goal
Make the load of each bundle more balanced, and then the load balancing algorithm can make it easier for the broker to reach a balanced state and avoid hot bundles.
Let load of all bundle change together. The load of Topic will change with the change of business cycle. We want the load of all bundles to change together, so that there will be no frequent unload/split bundle.
Implementation
The client sends a message to a multi-partition Topic, which uses round robin routing by default.
Therefore, we believe that the load of partitions of the same topic is balanced.
We assign partitions of the same topic to bundle by round-robin.
In this way, the difference in the number of partitions carried by the bundle will not exceed 1.
Since we consider the load of each partition of the same topic to be balanced, the load carried by each bundle is also balanced.
Operation steps:
Partition 0 finds a starting bundle through the consistent hash algorithm, assuming it is bundle0, we start from this bundle
By round-robin, assign partition 1 to the next bundle1, assign partition 2 to the next bundle2, and so on
If the number of partitions is less than the number of bundles, will some bundles have a high load?
Since the starting bundle is determined by consistent hashing, the starting point of each topic is different, which can prevent the earlier bundles from becoming hotspots.
When the number of bundles changes, will all partitions be reassigned?
Only when the number of bundles change, all partitions under the same namespace will be reassigned.
Changing the number of broker or partitions, will not trigger reassignment.
We only split when there is a hot bundle.
The current partition assign method makes the load of each bundle approximately balanced, so the bundle split will not be triggered unless it is artificially split.
Of course, we have also tested the time-consuming of assigning all partitions in the entire namespace in the worst case.
Test scenario: 6 * 4C32GB Brokers, the CPU water level of each broker is at 60%, and 50,000 partitions under the namespace are assigned at the same time. It takes about 30s.
Test report
We tested several scenarios, each using the three algorithms we mentioned above.
And every node looks well balanced.
Machine: 4C32G * 6
Test scenario: 200 partitions, after the cluster is stable, restart one of them
Even if the node restarted, the load difference between the nodes does not exceed 10%
Test scenario: Restart multiple nodes in a loop, and observe the final cluster load
Even if the nodes keep restarting, the load difference between the nodes does not exceed 10%
Test scenario: Add a new Broker
Finally, the load difference between the nodes does not exceed 10%
Test scenario: single-partition topic, even unloading the bundle will make the receiving broker to be a new hotspot, observe whether the algorithm will unload frequently
Motivation
With all existing Pulsar load balancing algorithms, it is difficult to balance the load of Pulsar cluster nodes. It often happens that some nodes are highly loaded while others are idle, and the CPU of each Broker is very different. There are three reasons why the existing way will make the cluster unbalanced:
Scope In this PIP, we are trying to solve the first problem.
Goal
API Changes
When lookup, partitions will be assigned to bundle:
Lookup -> NamespaceService#getBrokerServiceUrlAsync -> NamespaceService#getBundleAsync -> NamespaceBundles#findBundle
Consistent hashing is now used to assign partitions to bundle in NamespaceBundles#findBundle.
We should add a configuration item
topicBundleAssignmentStrategy
, so that different partition assignment strategy can be dynamically configured.The existing strategy will be used as the default (
topicBundleAssignmentStrategy=ConsistentHashingTopicBundleAssigner.class
)Implementation
TopicBundleAssignmentStrategy
Add a factory to create implementations
bundleOwnershipListeners
, we need to add abundleSplitListener
a. Trigger listeners when the bundle is splitted b. A strategy that needs to perceive changes in the Bundle will register the corresponding listener c. When the
NamespaceBundles
is initialized, the implementation class will be created through the factory classImplementation for demonstration:
Goal
Implementation
The client sends a message to a multi-partition Topic, which uses round robin routing by default. Therefore, we believe that the load of partitions of the same topic is balanced. We assign partitions of the same topic to bundle by round-robin. In this way, the difference in the number of partitions carried by the bundle will not exceed 1. Since we consider the load of each partition of the same topic to be balanced, the load carried by each bundle is also balanced.
Operation steps:
Partition 0 finds a starting bundle through the consistent hash algorithm, assuming it is bundle0, we start from this bundle By round-robin, assign partition 1 to the next bundle1, assign partition 2 to the next bundle2, and so on If the number of partitions is less than the number of bundles, will some bundles have a high load? Since the starting bundle is determined by consistent hashing, the starting point of each topic is different, which can prevent the earlier bundles from becoming hotspots.
When the number of bundles changes, will all partitions be reassigned? Only when the number of bundles change, all partitions under the same namespace will be reassigned. Changing the number of broker or partitions, will not trigger reassignment. We only split when there is a hot bundle. The current partition assign method makes the load of each bundle approximately balanced, so the bundle split will not be triggered unless it is artificially split. Of course, we have also tested the time-consuming of assigning all partitions in the entire namespace in the worst case.
Test scenario: 6 * 4C32GB Brokers, the CPU water level of each broker is at 60%, and 50,000 partitions under the namespace are assigned at the same time. It takes about 30s.
Test report
We tested several scenarios, each using the three algorithms we mentioned above. And every node looks well balanced. Machine: 4C32G * 6 Test scenario: 200 partitions, after the cluster is stable, restart one of them Even if the node restarted, the load difference between the nodes does not exceed 10%
Test scenario: Restart multiple nodes in a loop, and observe the final cluster load Even if the nodes keep restarting, the load difference between the nodes does not exceed 10%
Test scenario: Add a new Broker Finally, the load difference between the nodes does not exceed 10%
Test scenario: single-partition topic, even unloading the bundle will make the receiving broker to be a new hotspot, observe whether the algorithm will unload frequently