strimzi / strimzi-kafka-operator

Apache Kafka® running on Kubernetes
https://strimzi.io/
Apache License 2.0
4.77k stars 1.28k forks source link

Topic Operator: Changing replication factor #191

Closed tombentley closed 5 months ago

tombentley commented 6 years ago

Currently the topic controller supports ConfigMaps having a replicas key, which is used to declare the replication factor of a topic. This is only partially supported currently: changes to the replicas in the CM and changes to the partition assignment in Kafka don't get mirrored.

Kafka doesn't really support "replication factor" as a first class concept. It's basically used only when creating topics, and thereafter the APIs deal only with lists of assigned brokers. Indeed I think it's possible for different partitions of the same topic to have different replication factors. By exposing a readable replicas we end up having to synthesize something which doesn't really exist in Kafka. This is actually not trivial, because during reassignment the assigned replicas will be (a subset of) the union of the old and new replicas, which could lead to a reported replication factor being higher during the reassignment than it would be either before or after the reassignment. It would be possible to do, however, if we only updated the replicas each time a partition reassignment for the topic finished (strictly when reassignments of topic 0 finished, since this is what we base our fiction of replication factor on), but that involves knowing which such reassignments finish. Today that involves parting the JSON in the reassign_partitions znode, and in the future KIP-240 might change that.

An alternative to this might be to support setting replicas, but then "consuming it" and removing it from the CM as part of the topic controllers processing.

Another alternative might be to not support replicas as a replication factor at all, and instead report the assigned replicas. This would still have the problem of reporting more replicas during a reassignment, but in this respect we're no different from Kafka.

scholzj commented 6 years ago

I think that configuring the replication is important. We need to have it in some form. If integer replicas doesn't work, can we use the replica assignment instead? I.e. have an object / map in he config map which would describe the desired replication?

An alternative to this might be to support setting replicas, but then "consuming it" and removing it from the CM as part of the topic controllers processing.

I don't think this would work well. We should minimize overwriting he config maps to minimum. This can lead to issues such as overwriting the config map again and again ever time the user does 'kubectl apply ...`. It might also cause problems to people using GitOps

scholzj commented 6 years ago

Is this duplicate with #142?

tombentley commented 5 years ago

Replication factor change is a special-case of partition reassignment

As mentioned, Kafka doesn't support Replication Factor (RF) as a first class concept. It's just a convenience at the time a topic is created. Thereafter each partition can, in theory, have its own number of replicas, and changing those is done through partition reassignment.

Challenges due to Kafka's reassignment API

The only supported way to reassign partition replicas in Apache Kafka is via the CLI tool kafka-reassign-partitions.sh. This effectively writes some JSON to the /admin/reassign_partitions Zookeeper znode, which the Kafka Controller then actions. The Controller will update the znode as each partition's reassignment is completes and delete it once all the partitions in the JSON have been reassigned. Writing directly to Zookeeper is not a publicly supported API and might change without warning in a future version of Kafka.

A consequence of this design is that reassignment must be done in batches. This poses two problems:

  1. There's no natural mechanism for the Topic Operator to be able to batch reassignments from multiple KafkaTopics each modified at a slightly different time.
  2. There must be some mechanism for queuing reassignments which which can't be processed right now because of a pre-existing in-flight reassignment (which could take a very long time to complete).

Long term vision

In the bigger picture, users with more than a hundful of partitions will use some kind of partition balancer tool to perform a heuristic optimization. In this case being able to reassign partitions via Kubernetes is not really a requirement. Furthermore such tools are written to operate directly on Kafka (and in practice Zookeeper).

Putting these facts together suggests we should not try to aim to solve the whole reassignment problem in the Topic Operator, but should instead support use of a dedicated balancer (either an existing one or one we write ourselves).

There are two possible ways that could work:

  1. Another component, distinct from the Topic Operator and part of the dedicated balancer, takes responsibility for watching KafkaTopics for RF changes. The TO could still provide one-way sync from Kafka to K8S of the RF. This means the TO doesn't have to know anything about this other component, other than knowing to ignore K8s-side changes to RF.
  2. The TO takes responsibility for watching for RF changes, and informs another component, which does the actual work.

The problem with this bigger picture is we're not ready yet to start on that work.

Short term solution

The reality is that we should support RF changes even today. To do that we would need to make the TO configurable to use a built-in approach just supporting RF change, not reassignment. In the future the TO would be configured for the external balancer, but in the short term it would be configured for the internal RF-changer. We would provide only the functionality for changing RF in the absence of a true balancer, with the expectation that we will provide a full balancer in the future.

Fixed throttles

We would configure the TO with a fixed set of throttles to be used when it is performing reassignment.

Delayed initiation of reassignment

We would delay kicking off a reassignment for some period of time (rfChangeLingerSeconds) to allow a primitive way for users to batch changes. For example, when a modification changes RF we'd wait up to 1 minute for changes in other topics' RF. Once the time was up (or the configured number of topics changed — maxPartitionsInBatch) we'd start a batch of reassignments.

When many RF changes had got queued up (because a reconciliation was already happening) we would use the same maxPartitionsInBatch configuration when creating the next batch. In this case we would need a way to pick the maxPartitionsInBatch from the set of eligible partitions. Queueing them based on the order of the observation of their need for RF change would be possible, but this queue cannot persist across TO restarts. It would therefore be impossible to guarantee this order. Since the user knows the batch size they can always impose their own ordering simply by when they make the RF changes to the KafkaTopics. So perhaps a simpler way would be to batch partitions randomly in this case.

Checking for reassignment completion

We could use a ZK watch on admin/reassign_partitions to know that a partition had completed reassignment. This requires maintaining in memory the set of partitions being reassigned so that we can deduce which has completed when admin/reassign_partitions changes.

Because we cannot completely trust the watch mechanism we would also regularly poll for changes (completionPollInterval).

Example configuration for internal reassignment

kind: Kafka
spec:
  # ...
  entityOperator:
    topicOperator:
      reassignment: 
        type: internal # in the future we'd support external
        throttledRates:
          1: # broker 1
            leader: 70000000
            follower: 70000000
          2: # broker 2
            leader: 70000000
            follower: 70000000
        throttledReplicas:
          my-topic:
            leader:
              0: 101
              1: 102
            follower:
              0: 102
              1: 101
        rfChangeLingerSeconds: 60
        maxPartitionsInBatch: 20
        completionPollInterval: 60s

In the case of internal reassignment we would use env vars to pass this configuration to the TO.

Maintaining state

We would use a KafkaTopic.status.assignmentStatuses object to make explicit the assigned -> queued -> reassigning -> assigned status changes for each partition.

kind: KafkaTopic
metadata:
  name: my-topic
spec:
  partitions: 2
  replicas: 3
  config:
    # ...
status:
  assignmentStatuses:
    # Map or list associating partition id with a state
    - 0: assigned
    - 1: queued

A partition is in the queued when the TO has observed that the KafkaTopic.spec.replicas differs from the Kafka topic's current RF and is not currently being reassigned. A partition is in the reassigning state it is currently being reassigned. A partition is in the assigned state the TO has observed that the KafkaTopic.spec.replicas is the same as the Kafka topic's current RF and is not currently being reassigned.

The transition assigned -> queued would happen when the TO is notified of a change to KafkaTopic (or on periodic reconciliation) which changed the RF. The transition queued -> reassigning would happen then the TO adds the partition to a batch and starts the reassignment. The transition reassigning -> assigned would happen when the TO is notified via a ZK watch on /admin/reassign_partitions (or periodically) that the partition is no longer being reassigned. To do this it will need to keep in-memory the partitions being reassigned.

Selection brokers of new replicas

When RF is increased the additional replicas need to be assigned to brokers. When rack awareness is enabled in the Kafka cluster that assignment should avoid assigning the replicas of the partition to brokers in the same rack. To do this the TO will need to know the rack, which it can obtain via the AdminClient.

scholzj commented 5 years ago

OT, but how do you model this in the api?

        throttledRates:
          1: # broker 1
            leader: 70000000
            follower: 70000000
          2: # broker 2
            leader: 70000000
            follower: 70000000

We could have used it for the JBOD storage and we can sue it for the exposing improvements. Can you do it just as a Map<Integer, Object>?


In this example:

kind: Kafka
spec:
  # ...
  entityOperator:
    topicOperator:
      reassignment: 
        type: internal # in the future we'd support external
        throttledRates:
          1: # broker 1
            leader: 70000000
            follower: 70000000
          2: # broker 2
            leader: 70000000
            follower: 70000000
        throttledReplicas:
          my-topic:
            leader:
              0: 101
              1: 102
            follower:
              0: 102
              1: 101
        rfChangeLingerSeconds: 60
        maxPartitionsInBatch: 20
        completionPollInterval: 60s

I understand the three options at the bottom. I understand the general fixed throttles configured for the reassignments. I'm not sure I understand the point of the throttledReplicas object TBH. Also, if it is topic specific, isn't the better place for this in the KafkaTopic object?

Would it be complicated to configure the throttles per topic in KafkaTopic instead of having only general configuration on TO level?


Using status for reporting / tracking progress seems obvious choice, but it means support for OpenShift 3.11+, right?


When RF is increased the additional replicas need to be assigned to brokers. When rack awareness is enabled in the Kafka cluster that assignment should avoid assigning the replicas of the partition to brokers in the same rack. To do this the TO will need to know the rack, which it can obtain via the AdminClient.

Can we as an alternative let the user specify the map with the assignments in KafkaTopics?

tombentley commented 5 years ago

OT, but how do you model this in the api?

        throttledRates:
          1: # broker 1
            leader: 70000000
            follower: 70000000
          2: # broker 2
            leader: 70000000
            follower: 70000000

We could have used it for the JBOD storage and we can sue it for the exposing improvements. Can you do it just as a Map<Integer, Object>?

TBH that might not be possible, I'll have to check. We do something similar with Map<String, Object> for the config. We might be able to use a Map<String, Object> but constrain the key to have the decimal integer syntax in the schema.

In this example:

kind: Kafka
spec:
  # ...
  entityOperator:
    topicOperator:
      reassignment: 
        type: internal # in the future we'd support external
        throttledRates:
          1: # broker 1
            leader: 70000000
            follower: 70000000
          2: # broker 2
            leader: 70000000
            follower: 70000000
        throttledReplicas:
          my-topic:
            leader:
              0: 101
              1: 102
            follower:
              0: 102
              1: 101
        rfChangeLingerSeconds: 60
        maxPartitionsInBatch: 20
        completionPollInterval: 60s

I understand the three options at the bottom. I understand the general fixed throttles configured for the reassignments. I'm not sure I understand the point of the throttledReplicas object TBH. Also, if it is topic specific, isn't the better place for this in the KafkaTopic object?

throttledReplicas is listing which brokers, when in the given roles, throttle replication of the given topic partition. For example, when broker 101 is the leader for my-topic partition 0 the throttle given in throttledRates will apply. (It didn't help that I used 101 for broker id in the throttledReplicas but 1 in the throttledRates, sorry about that.)

Would it be complicated to configure the throttles per topic in KafkaTopic instead of having only general configuration on TO level?

I don't think it would work putting it in the KafkaTopic because it's quite specific to the brokers were the partition(s) are now. It would need to be changed every time the topic was reassigned (and/or it would be stale when the topic was not being reassigned). I'm also keen on not adding extra properties to KafkaTopic related the reassignment until we've thought more about the longer term plan for balancing. Putting throttling info in the KafkaTopic now might be difficult to support balancer features later. For example having the balancer automatically adjusting throttles is more complicated if they're present in the KafkaTopic than if it's just something the balancer could do directly.

Finally, the throttling configuration is more an aspect of a reconciliation than it is of a topic, imho.

Using status for reporting / tracking progress seems obvious choice, but it means support for OpenShift 3.11+, right?

I don't think pre-3.11 will prevent you from having a status object in your CR, it just won't be seen as a status subresource. I will experiment with this to confirm.

When RF is increased the additional replicas need to be assigned to brokers. When rack awareness is enabled in the Kafka cluster that assignment should avoid assigning the replicas of the partition to brokers in the same rack. To do this the TO will need to know the rack, which it can obtain via the AdminClient.

Can we as an alternative let the user specify the map with the assignments in KafkaTopics?

The problem is again whether this makes our lives harder in the future when we do cluster balancing. If there's a writeable replicaAssignment in something like

kind: KafkaTopic
metadata:
  name: my-topic
spec:
  partitions: 2
  replicaAssignment:
    0: [101, 103, 103]
    1: [104, 101, 102]
  config:
    # ...

and the topic is being balanced automatically then who is in control? There are issues with things like gitops applying an old assignment? This is still a problem with replicas, but it's a smaller problem because RF doesn't really change much whereas the assignments could be quite dynamic.

That said, maybe it's just a matter of making it explicit in the KafkaTopic resource what kind of assigment is expected:

kind: KafkaTopic
metadata:
  name: my-topic
spec:
  partitions: 2
  replicaAssignment:
    type: manual
      assignments:
        0: [101, 103, 103]
        1: [104, 101, 102]
  config:
    # ...

And a type: automatic would simply not expose any assignments to be tinkered with. Ultimately I expect we would use the status to expose the current assignment, but (in 3.11+) read-only.

scholzj commented 5 years ago

Putting throttling info in the KafkaTopic now might be difficult to support balancer features later. For example having the balancer automatically adjusting throttles is more complicated if they're present in the KafkaTopic than if it's just something the balancer could do directly.

That is probably fair point. But storing it in the Kafka CR seems weird. I also wonder if it would cause issues with rights. You might want some to manage his topic but maybe not to manage his Kafka cluster. For this the user needs the full rights to the Kafka resources. I wonder if holding this in config map or something could be better solution until we have the balancer.

I don't think pre-3.11 will prevent you from having a status object in your CR, it just won't be seen as a status subresource. I will experiment with this to confirm.

Yeah, I know ... but I wonder whether it wouldn't cause more issues that way (e.g. with users rewriting the status again and again from some Git repo etc.)

And a type: automatic would simply not expose any assignments to be tinkered with. Ultimately I expect we would use the status to expose the current assignment, but (in 3.11+) read-only.

This sounds good to me ... with the type, the users can choose and there is no problem with the rewriting of the resource. So basically:

That would probably work with the balancer. It might be still a bit awkward before we have the balancer. How much time you think we will need for that :-o.

tombentley commented 5 years ago

OT, but how do you model this in the api?

        throttledRates:
          1: # broker 1
            leader: 70000000
            follower: 70000000
          2: # broker 2
            leader: 70000000
            follower: 70000000

We could have used it for the JBOD storage and we can sue it for the exposing improvements. Can you do it just as a Map<Integer, Object>?

TBH that might not be possible, I'll have to check. We do something similar with Map<String, Object> for the config. We might be able to use a Map<String, Object> but constrain the key to have the decimal integer syntax in the schema.

Annoyingly it turns out that Kube doesn't support the patternProperties, so we can't constrain String keys at all. That doesn't stop using a Map<String, ...> like this, but it means the map wouldn't be validated by the schema. So it would probably be better to use something like this:

throttledRates:
 - broker: 1
   leader: 70000000
   follower: 70000000
 - broker: 2
   leader: 70000000
   follower: 70000000

Similarly the KafkaTopic.status would look like

status:
  assignmentStatuses:
    - partition: 0
      assignmentStatus: assigned
    - partition: 1
      assignmentStatus: queued
scholzj commented 5 years ago

@tombentley Ok, fair enough. Thanks for investigating.

tombentley commented 5 years ago

I don't think pre-3.11 will prevent you from having a status object in your CR, it just won't be seen as a status subresource. I will experiment with this to confirm.

Yeah, I know ... but I wonder whether it wouldn't cause more issues that way (e.g. with users rewriting the status again and again from some Git repo etc.)

In a Kubernetes without support for CRD subresources then we can't avoid the possibility of people updating the status, but because we're maintaining a watch on the resource anyway, the TO could be made to update the status whenever it was found to be wrong (i.e. as part of the reconciliation). This means that the status isn't completely reliable because it can be momentarily wrong; but it would be correct almost all of the time while the TO was running. I don't think the TO changes needed for this are very much work.

In a Kubernetes with support for CRD subresources we would need to register that the status should be treated as a subresource. Then we would have to update it via the subresource path. It seems that this is not directly supported in fabric8. We would still be able to use this API but would have to resort to adapting the fabric8 client to an EasyHTTP client.

Further complication comes if we want to make use of subresource support when it's available. In this case we would need to conditionally apply changes to the status in the reconciliation code and also conditionally update the status using the subresource path.

On the one hand the design I sketched above does not rely on having a completely trustworthy status (the TOP should work fine without it). But having changes which are not immediately synced to the underlying topic and no way to inform the user of the status of those changes (except logging) is horrible. So I think we should consider status as a requirement. The question then is whether we try to make use of subresource suppport when it's available, or just live with slightly rubbish status until the time that Strimzi no longer supports pre-1.11 (by which time fabric8 might have caught up a bit).

tombentley commented 5 years ago

Oh, about the fabric8 support, see https://github.com/fabric8io/kubernetes-client/issues/417

scholzj commented 5 years ago

It seems that this is not directly supported in fabric8. We would still be able to use this API but would have to resort to adapting the fabric8 client to an EasyHTTP client.

That is indeed a bit unfortunate. We would have to see if this can be easily worked around.

In a Kubernetes without support for CRD subresources then we can't avoid the possibility of people updating the status, but because we're maintaining a watch on the resource anyway, the TO could be made to update the status whenever it was found to be wrong (i.e. as part of the reconciliation).

That works fine if it is a single update done manually by the users. Might cause problems if the update is done every minute by some Gitops framework.

tombentley commented 5 years ago

Further complication comes if we want to make use of subresource support when it's available.

So the question there is: How can we discover when the k8s supports customresource subresources? On my kube (and with a CRD which declares use of status subresource):

$ curl 127.0.0.1:45321/apis/kafka.strimzi.io/v1alpha1/
{
  "kind": "APIResourceList",
  "apiVersion": "v1",
  "groupVersion": "kafka.strimzi.io/v1alpha1",
  "resources": [
    {
      "name": "kafkatopics",
      "singularName": "kafkatopic",
      "namespaced": true,
      "kind": "KafkaTopic",
      "verbs": [
        "delete",
        "deletecollection",
        "get",
        "list",
        "patch",
        "create",
        "update",
        "watch"
      ],
      "shortNames": [
        "kt"
      ]
    },
    {
      "name": "kafkatopics/status",
      "singularName": "",
      "namespaced": true,
      "kind": "KafkaTopic",
      "verbs": [
        "get",
        "patch",
        "update"
      ]
    },
...

So it seems clear that the kafkatopics/status object will only be present when status subresource is supported.

tombentley commented 5 years ago

Constraints for decreasing the replication factor

In most-important first order:

  1. The preferred leader should not be changed.
  2. The retained replicas should be from different racks (where possible)
  3. The current leader should be retained if possible (i.e. avoid leader election & CG rebalances where possible)

Constraints for increasing the replication factor

In most-important first order:

  1. The preferred leader should not be changed.
  2. The new brokers should be in different racks from already-assigned brokers (where possible)

There actually needs to be a 3rd constraint, but it's not clear what it should be. We basically need to avoid always putting new partitions on the same broker(s). We could do that by picking a broker from the pool of candidates at random, or by picking the broker with the fewest assigned partitions. I don't want to try to base this on metric data (because that would be getting into cluster balancing). Either of those choices could be a poor choice in some circumstances. Any preference?

scholzj commented 4 years ago

Reviewed on 10th September 2020: This is still needed. Should be done in cooperation with Cruise Control.

Abhik1998 commented 3 years ago

Hey!! I want to work on this project idea.

fibersel commented 3 years ago

Hi @tombentley , can I take this task for GSoC 2021? I already have successful experience with open source projects: https://github.com/ClickHouse/ClickHouse/pull/16578 https://github.com/ClickHouse/ClickHouse/pull/17144 https://github.com/catboost/catboost/pull/1598

tombentley commented 3 years ago

@fibersel you're welcome to have a look into it. There are at least a couple of other people who are also interested. I'll tell you what I've told them:

You're welcome to start poking around in the Topic Operator code if you want. And bug fixes are always welcome. But it's also worth thinking about how this is going to work overall. Increasing the replication factor of a topic requires Kafka to move data around between brokers, which can take a long time, so think about the consequences of that for the Topic Operator. Think about what happens if any of the TO, CC or Kafka brokers get restarted during this time. Some of those will be problematic and others not. Getting an sound overall understanding is really important for deciding exactly how the TO should handle this. So learn a bit about how partition reassignment works in Kafka, and how CC controls that is all very useful, and IMO at least as important as familiarity with the code at this stage.

Feel free to ask questions here or on the Strimzi channel on the CNCF slack.

sudip004 commented 2 years ago

hay i am biggner. anyone can you please help me what i can do

scholzj commented 2 years ago

@sudip004 You mean to implement this issue? I guess @tombentley @tomncooper @kyguy has some thoughts about this. But TBH it is not exactly an issue for a beginner.

Fattouh92 commented 2 years ago

why this is part of the topic operator and not cruise control?

scholzj commented 2 years ago

Triaged on 7.6.2022: Still desired. Work in progress with proposal https://github.com/strimzi/proposals/pull/50

scholzj commented 5 months ago

Done by @fvaleri. Will be released in 0.41.