Open yyang48 opened 1 year ago
I think this would be a good enhancement. But not completely sure about the idea to go rack by rack. It is true that the replicas should be always in different racks ... but as the partition replicas would sync independently, I'm not sure it gives you any real advantage compared to just looking over the whole cluster and restarting as many as you can without breaking some availability. I guess there could be also some configurable upper limit to the number of nodes being restarted at once as i think there would be people who would prefer a long roll out over too many changes at the same time. So having a limit would allow you to control the degree of parallelization.
@tombentley Would be the main rolling update guru I guess.
Thank you @scholzj for your quick reply! "Rack by Rack" is a special case from our experience. The number of racks is the same as the number of replication factor for all topics in our clusters. Therefore, we could achieve Kafka Cluster upgrade one rack at a time. It actually quite convenient for us, as we know that shutdown 1 rack won't cause any under min ISR:)
What you proposed is more generic and applicable to most use cases, so it will be beneficial to all users. For the rack by rack, it's good for our set up, but I'm not sure if this will help other users.
The problem with rack by rack is that it assumes the ideal case where everything is in-sync. Then it would be fine. But you cannot rely on it if you want to make it reliable.
Yes, it's correct. we need to wait for everything is in sync, before we move on to the next rack.
Therefore the ideal solution is to provide the parallelism for the kafka broker restart, as you mentioned.
@tombentley could you share your thoughts on this?
@yyang48 thanks for opening this! I thought a bit about it after you mentioned it at Kafka Summit, and I think there's definitely scope to do parallel rolling. Jakub is right that we can't assume that all replicas are in sync. So we'd need to validate that before restarting a rack's-worth of brokers.
More generally, as you scale up the number of brokers the number of possible groupings of brokers to be rolled together grows rather quickly. So we probably don't want to search the whole space to figure out the "best" (under some cost model) partitioning of the brokers into groups. But using existing partitionings such as racks (or possibly node pools, I've not thought too hard about it) with the necessary safety checks seems like a good compromise.
There are some details to think about: If a rack can't be rolled should we wait? (for how long, and what should we do if is never becomes rollable?), or should we find the individual brokers within it that can be rolled? (which would give more time for the troublesome brokers to catch up).
Another consideration is that having a rack's worth of brokers all catching up after rolling might mean we need to pay more attention to throttling. But I think we'd only really understand whether that's a real problem once we've got the ability to do parallel rolling.
@tombentley , thanks for thinking about this after our chat in Kafka Summit! I agree with you, we definitely need to think about the details and list different options we can do for this.
Do we need a proposal for this enhancement in the strimzi? if needed, I can draft one so that we can lay the ground for the discussion.
At the same time, let me give you an example, and show why this becomes an issue for us.
We have an AWS EKS running with 3 node groups, and kafka brokers are evenly distributed in the three node groups. The replication factors for all partitions are 3. We have a total of 120 brokers in this EKS and each broker is using a dedicated EC2 instance.
We get a request to upgrade the EKS cluster to new version, which requires node rotations for all the EKS worker nodes.
We adopt the strimzi drain cleaner, and set the Pod Disruption budget to 0 for the Kafka Cluster.
The EKS in place upgrade failed almost every time, for the big clusters only, because of the Pod Disruption budget.
The reason for this is the timeout from AWS EKS:
Quote from AWS EKS: https://docs.aws.amazon.com/eks/latest/userguide/managed-node-update-behavior.html
"It drains the Pods from the node. If the Pods don't leave the node within 15 minutes and there's no force flag, the upgrade phase fails with a PodEvictionFailure error. For this scenario, you can apply the force flag with the update-nodegroup-version request to delete the Pods"
Even if we change the Pod Disruption budge to 1, (this is breaking our assumption of the strimzi drain cleaner), we still hit the EKS upgrade timeout failures.
Fortunately we have setting that #rack == # nodegroup, our current workaround is: 1) change the Pod disruption budget to the number of brokers in that rack, 2) ask EKS to upgrade all the nodes in that rack. 3) repeat this and finish all the 3 node groups. With some careful monitoring, it can work. But it's less desired, and operation heavy.
Do we need a proposal for this enhancement in the strimzi? if needed, I can draft one so that we can lay the ground for the discussion.
I think a proposal would be needed for discussing the lower-level details of how that would be actually implemented. Such as how will it take care of the racks, check the rollability, etc.
Any proposal would need to specify exactly what the algorithm should be and what properties it needs to provide.
At it's most general, what we need for parallel rolling is an algorithm which partitions the set of brokers to be restarted into cells of brokers that can be safely restarted together. The partitioning of a set is equivalent to an equivalence relation on the elements of the set of brokers. The equivalence relation "has no partitions in common" is sufficient to guarantee safety and rack-aware assignment means no two brokers in the same rack will have replicas of the same partition, so the cells of the partitioning are the racks of brokers.
"Has no partitions in common" is actually a stricter condition than is really required. For example, even when two brokers both replicate some partition we might be able to safely restart in parallel them if $|isr| - min\_isr \ge 2$, or if neither broker is in the ISR but $|isr| \ge min\_isr$. However, I think if we tried to loosen the condition to use this fact it makes the partitioning much harder.
However, we need to consider that nothing in the broker, controller or Cruise Control can actually guarantee that all partitions are assigned in a rack-aware way. It's all best effort. For example in the absence of a topic creation policy which enfored it, a new topic could be created which violated this requirement. Nor does Kafka have a policy on reassignments. So even if CC is being used with rack awareness as a hard goal, we can't rule out that there's been a manual rebalance since the last CC rebalance. The rolling algorithm needs to work correctly even in all the edge cases where replica placement nearly, but doesn't quite, satisfy the rack awareness property.
So I've been experimenting with an algorithm which is oblivious to racks themselves, and simply uses the "has no partitions in common" relation to construct equivalence classes. This means it still satisfies the safety property even if rack-awareness is accidentally violated. It just happens that when replicas are assigned in a rack aware way we'll naturally have a smaller number of larger cells (fast rolling restarts). And if rack awareness is not being used the cells are much more likely to be singletons (slower rolling restarts).
It needs a lot more testing for correctness, but it seems to scale well enough to simulated large clusters (brokers ~120 and partitions ~ 500,000), needing only a few seconds to compute the cells in this case.
Another example where the "Has no partitions in common" relation is "too strong": a partition being reassigned between brokers in the same rack would cause result in two cells rather than a single one for the whole rack. In practice I don't think this a problem. There are other practical reasons why people might prefer smaller than rack-sized cells: The larger the number of brokers that are restarted at the same time the larger the impact on the rest of the cluster, and the larger the consequences of problems with and of the other brokers.
An additional constraint to consider for the overall partitioning is process.roles=controller
and process.roles=controller,broker
. Since we'd expect #controllers = 1,3,5 or just possibly 9 it should be fine to ensure that each cell contains a single controller, and ensure the cell containing the active controller is rolled last to avoid unnecessary controller elections (as we currently do on a per-broker basis).
This sounds like a very good idea to partition brokers based on the principle that "Has no partitions in common".
Do we still want to keep a max threshold to allow users to control the max size of a broker cell? By default, I think it can be unlimited, which is ok in our case.
I agree that the partitioning based on the min ISR requirement is harder, and it's changing dynamically overtime. The chances of ISR changing is higher than the chances of Partition Reassignment in production, so it will be much easier to get the broker set based on the partition placement.
When you have a PR for the experiment, please let me know. I'm happy to help and write more tests for it.
Triaged on 1.6.2023: This looks like it would be a worthy improvement. It should be investigated a bit more. A proposal would be needed to proceed with this and to review the algorithm properly.
Related problem
The current KafkaRoller Algorithm is designed to restart kafka pods one by one. It will wait for the current pod to become ready, and then find another pods to restart. It works well in small scale Kafka Clusters, such as < 10 kafka pods.
In the big Kafka Clusters with 50+ kafka brokers inside, it takes a long time to finish the upgrade. The reason is that by default, strimzi cluster operator will wait for 2 minutes for every single kafka pod. Therefore, the total time it needs to upgrade a Kafka Cluster is about 2 hours or even more.
Waiting for a longer time may be ok for some use cases. However, there will be some issues that waiting longer may cause more issues. For instance, in the AWS EKS cluster, when the EKS worker node upgrade, there is a max timeout setting for the restart the EKS worker node. If it takes so long, then the EKS upgrade will fail with timeout error.
Suggested solution
Kafka Cluster have rack definition build in, and it's probably ok to just upgrade many kafka brokers rom the same rack in parallel. It will cause a lot of partitions under replicated, but no under min ISR partitions. Doing so, it can significantly reduce the Kafka Cluster upgrade time.
In the implementation, we probably need to adjust the thread model for the kafka roller, so that it can restart multiple kafka brokers nodes at once.
Alternatives
No response
Additional context
No response