linkedin / cruise-control

Cruise-control is the first of its kind to fully automate the dynamic workload rebalance and self-healing of a Kafka cluster. It provides great value to Kafka users by simplifying the operation of Kafka clusters.
https://github.com/linkedin/cruise-control/tags
BSD 2-Clause "Simplified" License
2.74k stars 587 forks source link

Replica Movement Strategy Fails when determining order of tasks #1183

Closed matthinograb closed 4 years ago

matthinograb commented 4 years ago

Received an error while using the PrioritizeSmallReplicaMovementStrategy. Not sure if this related to https://github.com/linkedin/cruise-control/issues/1127.

CC Version: 2.0.96

Request:

POST /kafkacruisecontrol/rebalance?json=true&dryrun=true&use_ready_default_goals=true&concurrent_partition_movements_per_broker=1&concurrent_leader_movements=2

Error Code:

[2020-04-28 10:50:25,950] ERROR Error processing POST request '/rebalance' due to: 'java.lang.IllegalStateException: Replica movement strategy PrioritizeSmallReplicaMovementStrategy failed to determine order of ta
sks.'. (com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServlet)
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Replica movement strategy PrioritizeSmallReplicaMovementStrategy failed to determine order of tasks.
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
        at com.linkedin.kafka.cruisecontrol.servlet.handler.async.AbstractAsyncRequest.getResponse(AbstractAsyncRequest.java:57)
...
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:388)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Replica movement strategy PrioritizeSmallReplicaMovementStrategy failed to determine order of tasks.
        at com.linkedin.kafka.cruisecontrol.executor.strategy.AbstractReplicaMovementStrategy.applyStrategy(AbstractReplicaMovementStrategy.java:75)
        at com.linkedin.kafka.cruisecontrol.executor.strategy.AbstractReplicaMovementStrategy$1.applyStrategy(AbstractReplicaMovementStrategy.java:30)
        at com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskPlanner.maybeAddInterBrokerReplicaMovementTasks(ExecutionTaskPlanner.java:196)
        at com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskPlanner.addExecutionProposals(ExecutionTaskPlanner.java:133)
        at com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskManager.addExecutionProposals(ExecutionTaskManager.java:230)
        at com.linkedin.kafka.cruisecontrol.executor.Executor.initProposalExecution(Executor.java:435)
        at com.linkedin.kafka.cruisecontrol.executor.Executor.executeProposals(Executor.java:408)
        at com.linkedin.kafka.cruisecontrol.KafkaCruiseControl.executeProposals(KafkaCruiseControl.java:561)
        at com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RebalanceRunnable.workWithoutClusterModel(RebalanceRunnable.java:123)
        at com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.GoalBasedOperationRunnable.computeResult(GoalBasedOperationRunnable.java:137)
        at com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RebalanceRunnable.getResult(RebalanceRunnable.java:94)
        at com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RebalanceRunnable.getResult(RebalanceRunnable.java:30)
        at com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.OperationRunnable.run(OperationRunnable.java:45)
        at com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.GoalBasedOperationRunnable.run(GoalBasedOperationRunnable.java:34)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more
matthinograb commented 4 years ago

In summary, we're only calling the first comparator in the chain, and don't use the BaseReplicaMovementStrategy's comparator.

We also need to make sure that dynamically changing the strategy forces a chain with the BaseReplicaMovementStrategy.


Details:

The chained replica movement strategies class is a composite class. When applying strategy, it calls applyStrategy of its internal variable named current. Since the current is a separate object, it doesn't use an overridden taskComparator method of the chained strategies class (parent object).

From https://github.com/linkedin/cruise-control/issues/1127:

Why does this cause the exception?

The comparators defined in custom replica movement strategies may return equal (0) fairly often. For example, consider tasks that move partitions with no data, or tasks that both move non-under-replicated partitions, etc.

This becomes problematic, because we rely on the uniqueness of these tasks when inserting into the TreeSet that represent the execution plans. add()ing an element that is equal to one already contained in the TreeSet will return false, causing the IllegalStateExceptions.

A TreeSet instance performs all element comparisons using its compareTo (or compare) method, so two elements that are deemed equal by this method are, from the standpoint of the set, equal. https://docs.oracle.com/javase/8/docs/api/java/util/TreeSet.html


Reproducing the Issue

In ExecutionTaskPlannerTest.java, Change _partitionMovement1's partition size from 4 to 1, then re-run the tests. This should cause the size-based ReplicaMovementStrategies to throw an IllegalStateException, because there are now 2 partition movements that are "equal" for the Priorize Large/Small Partition Movement Strategies.

Fix

Open Question:

When testing the PrioritizeLargeReplicaMovementStrategy and after changing _partitionMovement1's partition size from 4 to 1, I expected the tasks to return a list of movements sorted by partition size, then execution id. Any idea why that might be the case? Expected order:

But I ended up getting:

matthinograb commented 4 years ago
Re: Open Question --

Stepping through and turning on Debug logging during the inter-broker partition movement rebalance algorithm was helpful to answer this question. In summary, my expectations were not correct, and the algorithm seems to be working as designed.

The priority of accepting a movement is:


Round 1

After round 1, execution list: _partitionMovement1, _partitionMovement3

Round 2

After round 2, execution list: _partitionMovement1, _partitionMovement3, _partitionMovement4, _partitionMovement2.

matthinograb commented 4 years ago

I think the round-robin strategy taking top precedence is a potential issue, because it does not always result in the global optimal ordering of movements.

Instead of doing the round-robin as the first priority, would it be reasonable to use the sorted list of interbroker movements as a top priority, and then do round-robin based on that? Are there any implications to doing so?


Example: Suppose we want to use the PrioritizeLargePartitionMovementStrategy, and we have three brokers in our readyBrokers set: 1, 2, and 3, with concurrency set to 1. Suppose we want to perform the following movements:

In theory, we would want to move the largePartition from broker 2 first, given the PrioritizeLargePartitionMovementStrategy. However, we will actually move the small partition, given the current algorithm implementation.

Round-robin will choose to select movements for broker 1, since it is first in the set of brokers*

I think this is a bit of an extreme example, but it highlights the potential problems when using round-robin broker strategy as the highest priority.

*HashMaps do not guarantee ordering, but I think insertion order is somewhat preserved. If the insertion order is not preserved, then we must assume some randomness. If there is randomness, then we may still randomly fall into this scenario.

matthinograb commented 4 years ago

Resolving, as bug was fixed by https://github.com/linkedin/cruise-control/pull/1184