linkedin / cruise-control

Cruise-control is the first of its kind to fully automate the dynamic workload rebalance and self-healing of a Kafka cluster. It provides great value to Kafka users by simplifying the operation of Kafka clusters.
https://github.com/linkedin/cruise-control/tags
BSD 2-Clause "Simplified" License
2.74k stars 585 forks source link

A potential bug in taking action to optimize goal for kafka assignment. #2128

Open yufeiyan1220 opened 7 months ago

yufeiyan1220 commented 7 months ago

Issue

As for the method maybeApplyBalancingAction in AbstractGoal, some brokers in candidateBrokers might be chosen to update replica or leadership. However, when we look at the parameter candidateBrokers, we find it coming from some calls from actual goals like TopicReplicaDistributionGoal, and the data structure of candidateBrokers are some sorted collection, based on the size of replicas or some other stuff. And then, the problem arises when we trying to modify the sorted key of elements in PriorityQueue(heap) or TreeSet(red-black tree) the data structure might be broken because of it.

I found some lines of code trying to remove and reinsert the broker to keep the order, but it happened after the broker changed. I found that the candidateBrokers.remove(b) return false in the following code.

TopicReplicaDistributionGoal {
...
  // Remove and reinsert the broker so the order is correct.
  candidateBrokers.remove(b);
  if (b.numReplicasOfTopicInBroker(topic) < _balanceUpperLimitByTopic.get(topic) || _fixOfflineReplicasOnly) {
     candidateBrokers.add(b);
  }
}

Fix

I think we could do the remove broker before action and reinsert it after it.(It seems not acceptable for immutable collections)

if (acceptance == ACCEPT) {
  // remove candidate before action
  candidateBrokers.remove(broker);
  if (action == ActionType.LEADERSHIP_MOVEMENT) {
    clusterModel.relocateLeadership(replica.topicPartition(), replica.broker().id(), broker.id());
  } else if (action == ActionType.INTER_BROKER_REPLICA_MOVEMENT) {
    clusterModel.relocateReplica(replica.topicPartition(), replica.broker().id(), broker.id());
  }
  candidateBrokers.add(broker);
  return broker;
}

Or we could sort candidate brokers as long as an action is made.