linkedin / cruise-control

Cruise-control is the first of its kind to fully automate the dynamic workload rebalance and self-healing of a Kafka cluster. It provides great value to Kafka users by simplifying the operation of Kafka clusters.
https://github.com/linkedin/cruise-control/tags
BSD 2-Clause "Simplified" License
2.76k stars 592 forks source link

Feature Request : Enable Cruise Control to work with a Concept of Broker Groups #1782

Closed mohitpali closed 2 years ago

mohitpali commented 2 years ago

Background

cruise-control works with a Kafka Cluster and builds optimized mitigation proposal and executes them for a Kafka Cluster. We have a Kafka Cluster that is split into multiple group of brokers. This is done in order to provide isolation and reduced blast radius. All replicas of a topic are placed on a specific broker group. This way if a topic runs hot, it would affect only the health of the brokers within a Broker Group.

Problem

Currently, cruise-control does not have a way to limit the leader and replica movement plan within a Broker Group. We would like to make cruise-control aware of the Broker Groups and propose optimizations where each optimization does leader movement or replica movement within a Broker Group. In short, each optimization should be limited within the boundary of Broker Group.

Proposed Solutions

1.cruise-controlcan read configurations called broker.group.id and broker.group.broker.ids in LoadMonitorConfig. MetadataClient.java would be modified to read broker group information and cluster() method modified to provide a filtered view of Cluster that would be limited to the brokerGroup. It would then include all the partitions that have all it's replicas that reside within the brokerGroup boundary. Based on the reduced Cluster view, the proposal generation, cluster model generation and executions are limited to the boundary of the Broker Group.

This solution will also modify the sample store and cruise-control topics to be prefixed with broker.group.id.

  1. cruise-control can add a new Goal called BrokerGroupAwareGoal. This Goal if is prioritized over other goals, it will guarantee that each optimization is contained within a Broker Group boundary. This should apply even after optimizations.

Add-on

mohitpali commented 2 years ago

Attaching a rough snippet for BrokerSetAwareGoal to be added for review -

package com.linkedin.kafka.cruisecontrol.analyzer.goals;

/**
 * HARD GOAL: Generate proposals to provide brokerSet-aware replica distribution.
 */
public class BrokerSetAwareGoal extends AbstractGoal {
  private static final Logger LOG = LoggerFactory.getLogger(TopicReplicaDistributionGoal.class);
  // This is used to identify brokers not excluded for replica moves.
  protected Set<Integer> _brokersAllowedReplicaMove;
  private Map<String, List<Integer>> _brokerIdsByBrokerSet;
  private Map<String, Set<Broker>> _brokersByBrokerSet;
  private Map<Broker, String> _brokerToBrokerSetMap;

  /**
   * Constructor for Rack Broker Set Aware Goal.
   */
  public BrokerSetAwareGoal() {
  }

  /**
   * Package private for unit test.
   */
  BrokerSetAwareGoal(BalancingConstraint constraint, Map<String, List<Integer>> brokerIdsByBrokerSet) {
    _balancingConstraint = constraint;
    _brokerIdsByBrokerSet = brokerIdsByBrokerSet;
  }

  @Override
  public String name() {
    return BrokerSetAwareGoal.class.getSimpleName();
  }

  @Override
  public boolean isHardGoal() {
    return true;
  }

  /**
   * This is a hard goal; hence, the proposals are not limited to dead broker replicas in case of self-healing.
   *
   * @param clusterModel        The state of the cluster.
   * @param optimizationOptions Options to take into account during optimization.
   */
  @Override
  protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions)
          throws OptimizationFailureException {
    Set<String> excludedTopics = optimizationOptions.excludedTopics();
    Set<String> topicsToRebalance = GoalUtils.topicsToRebalance(clusterModel, excludedTopics);
    if (topicsToRebalance.isEmpty()) {
      LOG.warn("All topics are excluded from {}.", name());
    }

    _brokersAllowedReplicaMove = GoalUtils.aliveBrokersNotExcludedForReplicaMove(clusterModel, optimizationOptions);
    if (_brokersAllowedReplicaMove.isEmpty()) {
      // Handle the case when all alive brokers are excluded from replica moves.
      ProvisionRecommendation recommendation = new ProvisionRecommendation.Builder(ProvisionStatus.UNDER_PROVISIONED)
              .numBrokers(clusterModel.maxReplicationFactor()).build();
      throw new OptimizationFailureException(String.format("[%s] All alive brokers are excluded from replica moves.", name()), recommendation);
    }

    // Log a warning if all replicas are excluded.
    if (clusterModel.topics().equals(optimizationOptions.excludedTopics())) {
      LOG.warn("All replicas are excluded from {}.", name());
    }

    // TODO Remove Hard Coded
    _brokerIdsByBrokerSet = new HashMap<>();
    _brokerIdsByBrokerSet.put("red", Arrays.asList(1, 2, 3, 4, 5, 6));
    _brokerIdsByBrokerSet.put("yellow", Arrays.asList(8, 9, 10, 12, 13, 15));
    _brokerIdsByBrokerSet.put("green", Arrays.asList(26, 20, 30, 24, 29, 28));
    _brokerIdsByBrokerSet.put("blue", Arrays.asList(19, 17, 14, 7, 27, 16));
    _brokerIdsByBrokerSet.put("purple", Arrays.asList(11, 18, 23, 21, 22, 25));

    // Group Brokers by brokerSet
    _brokersByBrokerSet = _brokerIdsByBrokerSet.entrySet().stream()
            .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().stream()
                    .map(id -> clusterModel.broker(id)).collect(Collectors.toSet())));

    _brokerToBrokerSetMap = new HashMap();

    _brokersByBrokerSet.forEach(
            (brokerSet, brokers) -> {
              brokers.forEach(broker -> _brokerToBrokerSetMap.put(broker, brokerSet));
            }
    );
  }

  /**
   * Update goal state.
   * Sanity check: After completion of balancing / self-healing, confirm that replicas of each topic reside fully in a
   * broker set.
   *
   * @param clusterModel        The state of the cluster.
   * @param optimizationOptions Options to take into account during optimization.
   */
  @Override
  protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions)
          throws OptimizationFailureException {
    // Sanity check to confirm that the final distribution is broker set aware.
    ensureBrokerSetAware(clusterModel, optimizationOptions);
    if (_provisionResponse.status() != ProvisionStatus.OVER_PROVISIONED) {
      _provisionResponse = new ProvisionResponse(ProvisionStatus.RIGHT_SIZED);
    }
    finish();
  }

  private void ensureBrokerSetAware(ClusterModel clusterModel, OptimizationOptions optimizationOptions)
          throws OptimizationFailureException {
    // Sanity check to confirm that the final distribution is brokerSet aware.
    Set<String> excludedTopics = optimizationOptions.excludedTopics();
    for (Map.Entry<String, List<Partition>> partitionsByTopic : clusterModel.getPartitionsByTopic().entrySet()) {
      if (!excludedTopics.contains(partitionsByTopic.getKey())) {
        List<Partition> partitions = partitionsByTopic.getValue();
        for (Partition partition : partitions) {
          if (!_brokersByBrokerSet.values().stream().anyMatch(brokerSetBrokers -> brokerSetBrokers.containsAll(partition.partitionBrokers()))) {
            throw new OptimizationFailureException(String.format("[%s] Partition %s is not brokerSet-aware. brokers (%s).",
                    name(), partition, partition.partitionBrokers()));
          }
        }
      }
    }
  }

  /**
   * brokerSet-awareness violations can be resolved with replica movements.
   *
   * @param broker              Broker to be balanced.
   * @param clusterModel        The state of the cluster.
   * @param optimizedGoals      Optimized goals.
   * @param optimizationOptions Options to take into account during optimization.
   */
  @Override
  protected void rebalanceForBroker(Broker broker,
                                    ClusterModel clusterModel,
                                    Set<Goal> optimizedGoals,
                                    OptimizationOptions optimizationOptions) throws OptimizationFailureException {
    for (Replica replica : broker.replicas()) {
      String originalBrokerBrokerSetId = _brokerToBrokerSetMap.get(replica.originalBroker());
      String currentBrokerBrokerSetId = _brokerToBrokerSetMap.get(broker);
      if (originalBrokerBrokerSetId.equals(currentBrokerBrokerSetId)) {
        continue;
      }
      // If the brokerSet awareness condition is violated. Move replica to an eligible broker
      Set<Broker> eligibleBrokers = _brokersByBrokerSet.get(originalBrokerBrokerSetId);
      if (maybeApplyBalancingAction(clusterModel, replica, eligibleBrokers,
              ActionType.INTER_BROKER_REPLICA_MOVEMENT, optimizedGoals, optimizationOptions) == null) {
        throw new OptimizationFailureException(String.format("[%s] Cannot move replica %s to %s on brokerSet %s", name(), replica, eligibleBrokers,
                originalBrokerBrokerSetId));
      }
    }
  }

  protected boolean doesReplicaMoveViolateActionAcceptance(ClusterModel clusterModel, Replica sourceReplica, Broker destinationBroker) {
    // Destination broker cannot be in any other brokerSet.
    String sourceBrokerSet = _brokerToBrokerSetMap.get(sourceReplica.broker());
    String destinationBrokerSet = _brokerToBrokerSetMap.get(destinationBroker);

    return !sourceBrokerSet.equals(destinationBrokerSet) ? true : false;
  }

  /**
   * Check whether the given action is acceptable by this goal. The following actions are acceptable:
   * <ul>
   *   <li>All leadership moves</li>
   *   <li>Replica moves that do not violate {@link #doesReplicaMoveViolateActionAcceptance(ClusterModel, Replica, Broker)}</li>
   *   <li>Swaps that do not violate {@link #doesReplicaMoveViolateActionAcceptance(ClusterModel, Replica, Broker)}
   *   in both direction</li>
   * </ul>
   *
   * @param action       Action to be checked for acceptance.
   * @param clusterModel The state of the cluster.
   * @return {@link ActionAcceptance#ACCEPT} if the action is acceptable by this goal,
   * {@link ActionAcceptance#BROKER_REJECT} if the action is rejected due to violating rack awareness in the destination
   * broker after moving source replica to destination broker, {@link ActionAcceptance#REPLICA_REJECT} otherwise.
   */
  @Override
  public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) {
    switch (action.balancingAction()) {
      case LEADERSHIP_MOVEMENT:
        return ACCEPT;
      case INTER_BROKER_REPLICA_MOVEMENT:
      case INTER_BROKER_REPLICA_SWAP:
        if (doesReplicaMoveViolateActionAcceptance(clusterModel,
                clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition()),
                clusterModel.broker(action.destinationBrokerId()))) {
          return BROKER_REJECT;
        }

        if (action.balancingAction() == ActionType.INTER_BROKER_REPLICA_SWAP
                && doesReplicaMoveViolateActionAcceptance(clusterModel,
                clusterModel.broker(action.destinationBrokerId()).replica(action.destinationTopicPartition()),
                clusterModel.broker(action.sourceBrokerId()))) {
          return REPLICA_REJECT;
        }
        return ACCEPT;
      default:
        throw new IllegalArgumentException("Unsupported balancing action " + action.balancingAction() + " is provided.");
    }
  }

  @Override
  public ClusterModelStatsComparator clusterModelStatsComparator() {
    return new GoalUtils.HardGoalStatsComparator();
  }

  @Override
  public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
    return new ModelCompletenessRequirements(MIN_NUM_VALID_WINDOWS_FOR_SELF_HEALING, 0.0, true);
  }

  @Override
  protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction action) {
    return true;
  }
}
efeg commented 2 years ago

@mohitpali Thanks for creating this proposal and providing followup details / questions to explore. Overall, I am on the same page with adding this feature to CC.

All replicas of a topic are placed on a specific broker group. This way if a topic runs hot, it would affect only the health of the brokers within a Broker Group.

Just for completeness, this approach will not fully isolate broker groups from the rest of the cluster. In particular, control requests (i.e. LeaderAndIsr, Metadata, StopReplica) will still be served by a centralized controller, which will still be common to the all broker groups.

mohitpali commented 2 years ago

Summary

Problem

Currently, cruise-control does not have a way to limit the leader and replica movement plan within a Broker Group. We would like to make cruise-control aware of the Broker Groups and propose optimizations where each optimization does leader movement or replica movement within a Broker Group. In short, each optimization should be limited within the boundary of Broker Group.

Proposals

We have explored 2 options to solve this problem:

Option 1 - Have separate Cruise control instances per broker group.

Modify MetaDataClient.cluster() method to provide a filtered view of Cluster that would be limited to a single brokerGroup. The proposal generation, cluster model generation and executions are limited to the boundary of the Broker Group. This would mean that we would run a Cruise-control instance each for a Broker Group.

Pros :

  1. Simple approach with minimum code changes.

Cons :

  1. It is an overkill to run a cruise-control per Broker Group.
  2. It increases the operational load to have cruise-control per Broker Group.

Option 2 - Add broker group aware capacity goals and distribution goals.

We will add new BrokerSetAwareGoal (Hard Goal) and BrokerSetAware(CPU/NW/Disk/Replica)DistributionGoal (Soft Goals). We would also add a BrokerSetInformationCache interface and provide a file based default implementation to fetch BrokerSet Information.

Pros :

  1. We do not have to run a cruise-control per BrokerSet.

Cons :

  1. More effort required as compared to the first Proposal.

Recommendation

Based on all the previous discussions and our explorations we are inclined to go with Option 2 for these reasons :

  1. We do wish to run as one cruise-control per cluster since it is more maintainable instead of running it per BrokerSet.

  2. It does not fetch meta data per broker set - it would reduce the network calls as compared to having one cruise-control per brokerset.

  3. We would not want multiple per BrokerSet Sample Store topic created in our clusters as it would be unneccessary number of management topics.

Follow-up questions

We would like to understand these question from cruise-control maintainers :

  1. Are there any other places where we may have to make BrokerGroup awareness changes ? For e.g. are Cluster Stats used anywhere that can break BrokerGroup boundary ? Would we need to make any changes in Cluster Stats ?
  2. How should we configure Soft Goals like CPU/NW/DiskDistributionGoal and ReplicaDistributionGoal and how are they evaluated? What configurations do you have at LinkedIn which are modified from default ? A view into this would help us evaluate our configurations that we would want to go with.
  3. Is there a way to get recommendations for Broker addition ? We have a metrics for BrokerFailureRate and an Anomaly for Broker Failure. Is Cluster being UNDER_PROVISIONED a good metric to alert at for scaling ? (New BrokerSet Distribution goals would mark the cluster UNDER_PROVISIONED if a BrokerSet is UNDER_PROVISIONED)
  4. Is there anything else you would need to document the design for this ?
  5. There are multiple pluggable interfaces e.g. Provisioner. Goal is an interface as well and I am wondering if it would be good to have all the pluggable interfaces into a separate module so that Plug-ins can implement the interface and not have dependency on the whole cruise-control package. What would you suggest as the best way to achieve this ? https://github.com/linkedin/cruise-control/issues/1783
  6. Any downside if we tweak CruiseControl GoalViolationDetector logic to run with a larger interval lets say 5 minutes ? Since we are using Prometheus we may have to scrape metrics from MSK Brokers at every 2 minutes to reduce scraping load on Brokers. We would have to tweak queries like iRate mentioned here. (https://github.com/linkedin/cruise-control/issues/1568)
mohitpali commented 2 years ago

Thank you Efe, Wang and Greg for hopping over a call. I am summarizing the answers to the questions here -

  1. Are there any other places where we may have to make BrokerGroup awareness changes ? For e.g. are Cluster Stats used anywhere that can break BrokerGroup boundary ? Would we need to make any changes in Cluster Stats ?

There is a place in Provisioner Class that would need to be BrokerGroupAware. On the places where we Add or Remove Resources and also provide recommendations to provisioning would require to provide recommendations based on BrokerGroups. ClusterStats provides an informational view on max/min/avg and it would need to be enriched with BrokerGroup information. However, ClusterStats is information only. There is also a place in ClusterModelStatsComparator that checks the goal state before and after and makes sure that proposal does not make the Cluster worse.

  1. How should we configure Soft Goals like CPU/NW/DiskDistributionGoal and ReplicaDistributionGoal and how are they evaluated? What configurations do you have at LinkedIn which are modified from default ? A view into this would help us evaluate our configurations that we would want to go with.

At large scale replica count is important - since the metadata pressure grows with growing replica count, we would need to take the distribution goals of replica also into account. At LinkedIn we use all the OpenSource goals and some additional goals. For configurations, we use 1 Sample per 5 minute and run Goal Violations every hour. This is to avoid getting larger memory load on cruise-control since having more history slows down Cruise-control.

  1. Is there a way to get recommendations for Broker addition ? We have a metrics for BrokerFailureRate and an Anomaly for Broker Failure. Is Cluster being UNDER_PROVISIONED a good metric to alert at for scaling ? (New BrokerSet Distribution goals would mark the cluster UNDER_PROVISIONED if a BrokerSet is UNDER_PROVISIONED)

That should be the right metrics to look at. It is binary metrics - it's either zero or one. You can also leverage the notification mechanism that can notify you on Slack or a Database.

  1. Is there anything else you would need to document the design for this ?

    Discussions over Github Issues or Google Docs can be good ways of discussing these.

  2. There are multiple pluggable interfaces e.g. Provisioner. Goal is an interface as well and I am wondering if it would be good to have all the pluggable interfaces into a separate module so that Plug-ins can implement the interface and not have dependency on the whole cruise-control package. What would you suggest as the best way to achieve this ? https://github.com/linkedin/cruise-control/issues/1783

    We are open to creating more modeules. Right now we have a cruise-control that has Kafka dependencies and a cruise-control-core module that does not have any kafka dependencies. We have modules for Python client as well.

  3. Any downside if we tweak CruiseControl GoalViolationDetector logic to run with a larger interval lets say 5 minutes ? Since we are using Prometheus we may have to scrape metrics from MSK Brokers at every 2 minutes to reduce scraping load on Brokers. We would have to tweak queries like iRate mentioned here. ( https://github.com/linkedin/cruise-control/issues/1568)

No, we in fact run Goal Detection runs every 1 hour and this is configurable through configurations. Sampling happens every 5 minutes on our end and the Window is 1 hour. That would mean we have 12 samples in an hour. Broker Failure Detector is the only place where we need to be more reactive compared to other goals.

mohitpali commented 2 years ago

Hows the progress on BrokerSetAwareGoal ?

We will submit a PR this week, and then distribution goals for BrokerSets will come in next quarter.

Why is Zookeeper access required by Cruisecontrol ?

We have 2.4 and 2.5 branches that suppport different Kafka versions. In 2.5* branch, zk removal has been made through a patch in 2.5.87 - we have had commits https://github.com/linkedin/cruise-control/issues/1415, https://github.com/linkedin/cruise-control/issues/1603, https://github.com/linkedin/cruise-control/issues/1605 The Zookeeper was used for Broker Failure detection, and to pick up information "since when", but we are storing this now in a file. We were also using ZK for KIP73 based quotas.

Why is number of concurrent leader movements set to 1000 ? Is that what you have in production ?

This is based on the ZNode size of 1MB and we have 1000 as a leadership movement batch size.

Do you use Concurrency Adjuster in Production ?

Yes, and Concurrency Adjuster is the recommended way to do executions. Even if you have self healing disabled, any type of executions will use Concurrency Adjuster. Tuning these by hand would be cumbersome.

Prometheus DefaultQuerySupplier is hard configured by class name and not by properties. We should refactor this.

Mohit will assign an issue to himself and work with it.