strimzi / strimzi-kafka-operator

Apache Kafka® running on Kubernetes
https://strimzi.io/
Apache License 2.0
4.78k stars 1.28k forks source link

Feat: MirrorMaker 2.0 - replication without alias prefix on topics of target cluster #2546

Closed thpham closed 3 years ago

thpham commented 4 years ago

What ? In the context of a DRP plan, I'm trying to set up an active-passive replication between 2 clusters (on different DC sites), but I don't want the topic names to be changed between clusters.

Why ? we want hot standby consumers/producers to be able to start with the same configurations on the recovery site. That way, it is also easier to do the inversion and replicate back to the initial cluster.

At the moment, if i have a topic called "some_topic" it is automatically replicated to "cluster1.some_topic" on the target cluster.

With the current MM2 CRD:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaMirrorMaker2
metadata:
  name: my-mm2-cluster
spec:
  version: 2.4.0
  replicas: 1
  connectCluster: "kafkazk-2-target"
  config:
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
  clusters:
  - alias: "kafkazk-1-source"
    bootstrapServers: my-cluster-kafka-bootstrap.kafkazk-1:9092
  - alias: "kafkazk-2-target"
    bootstrapServers: my-cluster-kafka-bootstrap.kafkazk-2:9092
  mirrors:
  - sourceCluster: "kafkazk-1-source"
    targetCluster: "kafkazk-2-target"
    sourceConnector:
      config:
        replication.factor: 1
        offset-syncs.topic.replication.factor: 1
        sync.topic.acls.enabled: "false"
    heartbeatConnector:
      config:
        heartbeats.topic.replication.factor: 1
    checkpointConnector:
      config:
        checkpoints.topic.replication.factor: 1
    topicsPattern: ".*"
    groupsPattern: ".*"

I don't see how it could be configured, because the current operator use the aliases for connecting clusters.

I read here and It was tested that the following MM2 attributes remove the prefix:

"replication.policy.separator": ""
"source.cluster.alias": "",
"target.cluster.alias": "",
scholzj commented 4 years ago

@ajborley Is this something you have been thinking about? Is it in your plans to add some support for this?

ajborley commented 4 years ago

@scholzj I have done a quick test this morning, and with a small edit to the alias pattern ( https://github.com/strimzi/strimzi-kafka-operator/blob/master/api/src/main/java/io/strimzi/api/kafka/model/KafkaMirrorMaker2ClusterSpec.java#L47) to change the regex so that it allows zero-length aliases @Pattern("^[a-zA-Z0-9\\._\\-]{0,100}$"), it is possible to get this working using an empty source cluster alias and the attribute @thpham notes above.

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaMirrorMaker2
metadata:
  name: my-mm2-cluster
spec:
  version: 2.4.0
  replicas: 1
  connectCluster: "kafkazk-2-target"
  config:
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
  clusters:
  - alias: ""
    bootstrapServers: my-cluster-kafka-bootstrap.kafkazk-1:9092
  - alias: "kafkazk-2-target"
    bootstrapServers: my-cluster-kafka-bootstrap.kafkazk-2:9092
  mirrors:
  - sourceCluster: ""
    targetCluster: "kafkazk-2-target"
    sourceConnector:
      config:
        replication.factor: 1
        offset-syncs.topic.replication.factor: 1
        sync.topic.acls.enabled: "false"
        replication.policy.separator: ""
    heartbeatConnector:
      config:
        heartbeats.topic.replication.factor: 1
    checkpointConnector:
      config:
        checkpoints.topic.replication.factor: 1
    topicsPattern: ".*"
    groupsPattern: ".*"

Unfortunately, when TLS or client auth is switched on then the deployment fails to start due to the use of the cluster aliases when setting up mount paths, etc, so I don't think editing that regex is a long term solution.

The architected solution is to add a class that implements org.apache.kafka.connect.mirror.ReplicationPolicy and add the jar to the /opt/kafka/libs dir by following the instructions here: https://strimzi.io/docs/latest/#creating-new-image-from-base-str. The DefaultReplicationPolicy that Kafka supplies (https://github.com/apache/kafka/blob/trunk/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java) adds the source cluster alias to the target topic names, so it would be very easy to make a ReplicationPolicy that just returns the original topic name.

For this issue, we could add a new IdentityReplicationPolicy class that we build into a jar and add to the Kafka image - then users could just add replication.policy.class: org.strimzi.kafka.connect.mirror.IdentityReplicationPolicyto their MM2 spec.mirror.sourceConnector.config. What do you think?

scholzj commented 4 years ago

I think that sounds definitely much better than having the alias as "".

thpham commented 4 years ago

I like the idea of being able to define a replication.policy.class , because it makes it very flexible.

ajborley commented 4 years ago

@thpham You can already do this, as it’s part of base Kafka. You just need to implement the interface, build a jar and then add the jar to an extended Strimzi Kafka image, by following the instructions here: https://strimzi.io/docs/latest/#creating-new-image-from-base-str

ajborley commented 4 years ago

@thpham - sure, you just need to ensure the jar containing your ReplicationPolicy class is built into the kafka image in the /opt/kafka/plugins directory, which will be automatically classloaded by Kafka. Then you can use the replication.policy.class config item in the connector config with your class name to use it when the connector is running. For example:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaMirrorMaker2
metadata:
  name: my-mm2-cluster
spec:
  version: 2.4.0
  image: my-custom-kafka-image
  replicas: 1
  connectCluster: "kafkazk-2-target"
  config:
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
  clusters:
  - alias: "kafkazk-1-source"
    bootstrapServers: my-cluster-kafka-bootstrap.kafkazk-1:9092
  - alias: "kafkazk-2-target"
    bootstrapServers: my-cluster-kafka-bootstrap.kafkazk-2:9092
  mirrors:
  - sourceCluster:  "kafkazk-1-source"
    targetCluster: "kafkazk-2-target"
    sourceConnector:
      config:
        replication.factor: 1
        offset-syncs.topic.replication.factor: 1
        sync.topic.acls.enabled: "false"
        replication.policy.class: org.thpham.kafka.connect.mirror.IdentityReplicationPolicy
    topicsPattern: ".*"
    groupsPattern: ".*"
thpham commented 4 years ago

@ajborley Thanks again for explanations. I managed to make a quick project: here https://github.com/thpham/mm2-identity-replication and I published the image on dockerhub. but it looks like the target topics is not even created now. I see that the jar is loaded in the mm2 pod, but I see the following stack trace in the operator logs: https://gist.github.com/thpham/bee39d064c96678ac452e68bab57212b

Do you have an idea what could be wrong ?

ajborley commented 4 years ago

@thpham - apologies for the delay, I was on vacation last week. I have managed to get your scenario working on my system - see the PR I've opened on your repo. https://github.com/thpham/mm2-identity-replication/pull/1

I think the MirrorSourceConnector could not access the IdentityReplicationPolicy class due to Kafka's plugin classloading, which is designed to allow multiple connector implementations to be loaded as plugins without interfering with each other. In this case, the MirrorSourceConnector is loaded as a plugin, but the IdentityReplicationPolicy jar was loaded as a different plugin meaning the MirrorSourceConnector could not find the specified replication policy class. Moving the jar to the /opt/kafka/libs directory in the image fixed the problem.

thpham commented 4 years ago

@ajborley No worries, Hope you spent a good relaxing time ;-)

Thank you again for taking the time to test it ! I checked it again and now it looks good !

Cheers.

armisz commented 4 years ago

The use case sounds very common. It would be handy to have it as an optional built in feature inside Strimzi. How do you feel about it @ajborley, @scholzj, @thpham ?

thpham commented 4 years ago

Hello, I would love to get this included within Strimzi, but I don't know the correct approach for this. I prepared a branch before doing any PR, I'd like to get maintainers input if I do it correctly or not.

Thank you.

scholzj commented 4 years ago

@thpham If you have the code already ready somewhere, you can always open the PR of course. If you don't have it ready yet, it might be good to either get in touch with us on Slack or share some proposal here. If you think the changes would be bigger you can also open the proposal here: https://github.com/strimzi/proposals ... but it might not be needed for some smaller changes.

scholzj commented 3 years ago

Closing as this is fixed by #3423