linkedin / cruise-control

Cruise-control is the first of its kind to fully automate the dynamic workload rebalance and self-healing of a Kafka cluster. It provides great value to Kafka users by simplifying the operation of Kafka clusters.
https://github.com/linkedin/cruise-control/tags
BSD 2-Clause "Simplified" License
2.74k stars 587 forks source link

Exception thrown after starting leadership movements for `add_broker` request #1975

Closed kyguy closed 1 year ago

kyguy commented 1 year ago

When requesting Cruise Control to balance partitions to new brokers, I have been running into the following exception:

2023-03-03 16:27:31 INFO  Executor:1522 - Starting 69 leadership movements.
2023-03-03 16:27:31 ERROR Executor:1288 - Executor got exception during execution
java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "java.util.Map.get(Object)" is null
        at com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskPlanner.getLeadershipMovementTasks(ExecutionTaskPlanner.java:322) ~[cruise-control-2.5.113.jar:?]
        at com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskManager.getLeadershipMovementTasks(ExecutionTaskManager.java:120) ~[cruise-control-2.5.113.jar:?]
        at com.linkedin.kafka.cruisecontrol.executor.Executor$ProposalExecutionRunnable.moveLeadershipInBatch(Executor.java:1547) ~[cruise-control-2.5.113.jar:?]
        at com.linkedin.kafka.cruisecontrol.executor.Executor$ProposalExecutionRunnable.moveLeaderships(Executor.java:1526) ~[cruise-control-2.5.113.jar:?]
        at com.linkedin.kafka.cruisecontrol.executor.Executor$ProposalExecutionRunnable.execute(Executor.java:1284) ~[cruise-control-2.5.113.jar:?]
        at com.linkedin.kafka.cruisecontrol.executor.Executor$ProposalExecutionRunnable.run(Executor.java:1179) ~[cruise-control-2.5.113.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at java.lang.Thread.run(Thread.java:833) ~[?:?]
2023-03-03 16:27:31 WARN  operationLogger:1323 - Task [6062ba95-fd9f-469a-805a-54f5729c0567] userPOST /kafkacruisecontrol/add_broker execution is interrupted with exception Cannot invoke "java.lang.Integer.intValue()" because the return value of "java.util.Map.get(Object)" is null.
2023-03-03 16:27:31 INFO  Executor:1180 - Execution finished.
2023-03-03 16:27:34 INFO  UserTaskManager:349 - UserTask 0844f106-3038-4e71-a1e6-e76a2b0a2f37 is completed and removed from active tasks list

From what I understand, we need to add a key check like we have here [2] to the following code we have here [1]:

for (int broker: replicas) {
  if (leadershipConcurrency.containsKey(broker) && leadershipConcurrency.get(broker) > 0) {   // <--- Added line
    leadershipConcurrency.put(broker, leadershipConcurrency.get(broker) - 1);
  }
}

To avoid situations where leadershipConcurrency.get(broker) returns null.

Could someone confirm this is an issue? If so, I am happy to submit a patch for this if that makes things easier!

Environment Cruise Control version: 2.5.113

Additional context [1] https://github.com/linkedin/cruise-control/blob/migrate_to_kafka_2_5/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java#L322 [2] https://github.com/linkedin/cruise-control/blob/migrate_to_kafka_2_5/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java#L314

timchenko-a commented 1 year ago

Got pretty much the same error when was doing remove_broker operation, during the same phase of leadership movements:

[2023-03-14 13:12:15,389] INFO 41/42 (97.62%) inter-broker partition movements completed. 26/26 (100.00%) MB have been moved. (com.linkedin.kafka.cruisecontrol.executor.Executor)
[2023-03-14 13:12:15,389] INFO Executor will execute 1 task(s) (com.linkedin.kafka.cruisecontrol.executor.Executor)
[2023-03-14 13:12:25,480] INFO Finished tasks: [{EXE_ID: 41, INTER_BROKER_REPLICA_ACTION, {testing-topic-22-0, oldLeader: 1, [1, 2, 3] -> [1, 2, 6]}, COMPLETED}]. (com.linkedin.kafka.cruisecontrol.executor.Executor)
[2023-03-14 13:12:25,480] INFO 42/42 (100.00%) inter-broker partition movements completed. 26/26 (100.00%) MB have been moved. (com.linkedin.kafka.cruisecontrol.executor.Executor)
[2023-03-14 13:12:25,480] INFO Inter-broker partition movements finished. (com.linkedin.kafka.cruisecontrol.executor.Executor)
[2023-03-14 13:12:25,480] INFO Starting 0 intra-broker partition movements. (com.linkedin.kafka.cruisecontrol.executor.Executor)
[2023-03-14 13:12:25,480] INFO Intra-broker partition movements finished. (com.linkedin.kafka.cruisecontrol.executor.Executor)
[2023-03-14 13:12:25,480] INFO Starting 12 leadership movements. (com.linkedin.kafka.cruisecontrol.executor.Executor)
[2023-03-14 13:12:25,481] ERROR Executor got exception during execution (com.linkedin.kafka.cruisecontrol.executor.Executor)
java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "java.util.Map.get(Object)" is null
    at com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskPlanner.getLeadershipMovementTasks(ExecutionTaskPlanner.java:322) ~[cruise-control-2.5.114-SNAPSHOT.jar:?]
    at com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskManager.getLeadershipMovementTasks(ExecutionTaskManager.java:120) ~[cruise-control-2.5.114-SNAPSHOT.jar:?]
    at com.linkedin.kafka.cruisecontrol.executor.Executor$ProposalExecutionRunnable.moveLeadershipInBatch(Executor.java:1547) ~[cruise-control-2.5.114-SNAPSHOT.jar:?]
    at com.linkedin.kafka.cruisecontrol.executor.Executor$ProposalExecutionRunnable.moveLeaderships(Executor.java:1526) ~[cruise-control-2.5.114-SNAPSHOT.jar:?]
    at com.linkedin.kafka.cruisecontrol.executor.Executor$ProposalExecutionRunnable.execute(Executor.java:1284) ~[cruise-control-2.5.114-SNAPSHOT.jar:?]
    at com.linkedin.kafka.cruisecontrol.executor.Executor$ProposalExecutionRunnable.run(Executor.java:1179) ~[cruise-control-2.5.114-SNAPSHOT.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]
[2023-03-14 13:12:25,482] WARN Task [575a0abc-9e11-4d07-9faa-6d7db6983bde] userPOST /kafkacruisecontrol/REMOVE_BROKER execution is interrupted with exception Cannot invoke "java.lang.Integer.intValue()" because the return value of "java.util.Map.get(Object)" is null. (operationLogger)
[2023-03-14 13:12:25,482] INFO Execution finished. (com.linkedin.kafka.cruisecontrol.executor.Executor)
kyguy commented 1 year ago

The changes suggested above appear to fix the issue, going to run a few more tests to get a better understanding of the area of code and open a PR for this tomorrow

kyguy commented 1 year ago

After further investigation, it appears that Cruise Control does not initialize the movement concurrency configuration for brokers that don't have any replicas, so we run into an NPE when trying to move partitions to new brokers and away from stale brokers.

I have just opened a PR to fix this by setting the leadership concurrency configurations for all brokers in the cluster regardless if they have partitions on them yet. I have verified this fix against our reproducer. Will add unit tests to the PR if necessary