strimzi / strimzi-kafka-operator

Apache Kafka® running on Kubernetes
https://strimzi.io/
Apache License 2.0
4.85k stars 1.3k forks source link

[Kafka MirrorMaker2 Missing of few Topics as part of Auto Replication] ... #3338

Closed vperi1730 closed 4 years ago

vperi1730 commented 4 years ago

Hi Team,

I have enabled a source and target cluster and MM2 is deployed in the target cluster. Below is the configuration.

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaMirrorMaker2
metadata:
  name: brcm-mm2
spec:
  version: 2.4.0 
  replicas: 1
  connectCluster: "mm-backup-cluster3"
  clusters:
  - alias: "mm-src-cluster"
    authentication:
      certificateAndKey:
        certificate: user.crt
        key: user.key
        secretName: mm-consumer-user
      type: tls
    bootstrapServers: mm-src-cluster-kafka-bootstrap.kafka-mirror-src.svc:9093
    tls:
      trustedCertificates:
      - certificate: ca.crt
        secretName: mm-src-cluster-cluster-ca-cert
  - alias: "mm-backup-cluster3"
    authentication:
      certificateAndKey:
        certificate: user.crt
        key: user.key
        secretName: mm02-prod-user-tgt1
      type: tls
    bootstrapServers: mm-backup-cluster3-kafka-bootstrap.kafka-mirror-tgt1.svc:9093
    config:
      config.storage.replication.factor: 1
      offset.storage.replication.factor: 1
      status.storage.replication.factor: 1

    tls:
      trustedCertificates:
      - certificate: ca.crt
        secretName: mm-backup-cluster3-cluster-ca-cert
  mirrors:
  - sourceCluster: "mm-src-cluster"
    targetCluster: "mm-backup-cluster3"
    sourceConnector:
      config:
        replication.factor: 1
        offset-syncs.topic.replication.factor: 1
        sync.topic.acls.enabled: "true"
        refresh.groups.enabled: "true"
        refresh.topics.enabled: "true"

    heartbeatConnector: 
      config:
        heartbeats.topic.replication.factor: 1 
    checkpointConnector: 
      config:
        checkpoints.topic.replication.factor: 1 
    topicsPattern: ".*"
    groupsPattern: ".*" 

Topics in Src Cluster:

kubectl get kafkatopics -n kafka-mirror-src1
NAME                                        PARTITIONS   REPLICATION FACTOR
mm2-july19-topic                            4            2
mm2-offset-syncs.my-trgt-cluster.internal   1            1
my-topic-mm2                                6            2
my-topic1                                   6            2
my-topic2                                   6            2
test-topic                                  3            1
test-topic-r2                               3            2

As MM2 is up and running in the target, I have noticed some of the topics didn't get replicated and some of them have a replication factor mismatch, I have never seen this behavior earlier, not sure why is this happening. One catch I could see is if the replication factor is >1 it is taking a lot of time however unable to replicate.

Is this anything connected to the Kafka CR configuration or is there any other reason??

Also, test-topic-r2 and mm2-july19-topic which replicated from Src to Target have replication factor count as 1 instead of 2.

kubectl get kafkatopics -n kafka-mirror-trgt1
NAME                                                          PARTITIONS   REPLICATION FACTOR
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a   50           1
heartbeats                                                    1            1
mirrormaker2-cluster-configs                                  1            1
mirrormaker2-cluster-offsets                                  25           1
mirrormaker2-cluster-status                                   5            1
my-src-cluster.checkpoints.internal                           1            1
my-src-cluster.mm2-july19-topic                               4            1
my-src-cluster.test-topic                                     3            1
my-src-cluster.test-topic-r2                                  3            1

My Src Cluster Kafka CR.

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.strimzi.io/v1beta1","kind":"Kafka","metadata":{"annotations":{},"name":"my-src-cluster","namespace":"kafka-mirror-src1"},"spec":{"clientsCa":{"generateCertificateAuthority":true},"clusterCa":{"generateCertificateAuthority":true},"entityOperator":{"topicOperator":{},"userOperator":{}},"kafka":{"authorization":{"type":"simple"},"config":{"log.message.format.version":"2.4","offsets.topic.replication.factor":1,"transaction.state.log.min.isr":1,"transaction.state.log.replication.factor":1},"jmxOptions":{},"jvmOptions":{"gcLoggingEnabled":true},"listeners":{"external":{"authentication":{"type":"tls"},"tls":true,"type":"loadbalancer"},"plain":{},"tls":{"authentication":{"type":"tls"}}},"logging":{"loggers":{"kafka.root.logger.level":"DEBUG"},"type":"inline"},"metrics":{"lowercaseOutputName":true,"rules":[{"labels":{"clientId":"$3","partition":"$5","topic":"$4"},"name":"kafka_server_$1_$2","pattern":"kafka.server\u003ctype=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)\u003e\u003c\u003eValue","type":"GAUGE"}]},"replicas":2,"storage":{"deleteClaim":false,"size":"10Gi","type":"persistent-claim"},"version":"2.4.0"},"zookeeper":{"logging":{"loggers":{"zookeeper.root.logger":"DEBUG"},"type":"inline"},"replicas":3,"storage":{"deleteClaim":false,"size":"10Gi","type":"persistent-claim"}}}}
  creationTimestamp: "2020-07-17T14:34:39Z"
  generation: 3
  name: my-src-cluster
  namespace: kafka-mirror-src1
  resourceVersion: "168789980"
  selfLink: /apis/kafka.strimzi.io/v1beta1/namespaces/kafka-mirror-src1/kafkas/my-src-cluster
  uid: 7e71e666-c785-437b-b9ab-8aaccf580408
spec:
  clientsCa:
    generateCertificateAuthority: true
  clusterCa:
    generateCertificateAuthority: true
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    authorization:
      superUsers:
      - CN=user-src
      - CN=root
      type: simple
    config:
      log.message.format.version: "2.4"
      offsets.topic.replication.factor: 1
      transaction.state.log.min.isr: 1
      transaction.state.log.replication.factor: 1
    jmxOptions: {}
    jvmOptions:
      gcLoggingEnabled: true
    listeners:
      external:
        authentication:
          type: tls
        tls: true
        type: loadbalancer
      plain: {}
      tls:
        authentication:
          type: tls
    logging:
      loggers:
        kafka.root.logger.level: DEBUG
      type: inline
    metrics:
      lowercaseOutputName: true
      rules:
      - labels:
          clientId: $3
          partition: $5
          topic: $4
        name: kafka_server_$1_$2
        pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
        type: GAUGE
    replicas: 3
    storage:
      deleteClaim: false
      size: 10Gi
      type: persistent-claim
    version: 2.4.0
  zookeeper:
    logging:
      loggers:
        zookeeper.root.logger: DEBUG
      type: inline
    replicas: 3
    storage:
      deleteClaim: false
      size: 10Gi
      type: persistent-claim
status:
  conditions:
  - lastTransitionTime: 2020-07-18T02:27:15+0000
    status: "True"
    type: Ready
  listeners:
  - addresses:
    - host: my-src-cluster-kafka-bootstrap.kafka-mirror-src1.svc
      port: 9092
    type: plain
  - addresses:
    - host: my-src-cluster-kafka-bootstrap.kafka-mirror-src1.svc
      port: 9093
    certificates:
    - |
      -----BEGIN CERTIFICATE-----
      MIIDLTCCAhWgAwIBAgIJAKmGATKr+WpLMA0GCSqGSIb3DQEBCwUAMC0xEzARBgNV
      BAoMCmlvLnN0cmltemkxFjAUBgNVBAMMDWNsdXN0ZXItY2EgdjAwHhcNMjAwNzE3
      MTQzNDQwWhcNMjEwNzE3MTQzNDQwWjAtMRMwEQYDVQQKDAppby5zdHJpbXppMRYw
      FAYDVQQDDA1jbHVzdGVyLWNhIHYwMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
      CgKCAQEA0vXECx8jMjPcHpbm/FSXuMogCm/WzGsG9Nu77JouvVypFkTgW6ZElJK1
      sTGdGHijwjpT6yHJVjNxmLuaumWMox5Q1IP4gRqBki1T1p9EH3gfPBIidN4fmBXE
      HjClLOuYOCKHBF0xsulW4+0rv9zlrXf27nEHZx0Q25q2gbko0oI+3vVT7FLRJ4NQ
      1hgDUCsHNdiJMkyxB/lf1UhnQBEEHqXpZZW+TiKD7ZOVYtE/+NGi22K0vNFOHV0N
      XtbP1QzvxIe1F9JCYeTtkuiVchABBEwmrLg2D1zShcn/Sj62spIRLKXFse2wPuai
      T1ln9y34EfKCI/5jCVoA7ZGuGS/8FwIDAQABo1AwTjAdBgNVHQ4EFgQU/O9zRhsc
      DMJil5Yi+rp50LkEVhgwHwYDVR0jBBgwFoAU/O9zRhscDMJil5Yi+rp50LkEVhgw
      DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAodS+QzH0d0agztzAM3JK
      HE2L1i64WlyToA1XvVeuGx61T4Mkok+ADLntsZWsfkWpSjO42sLzT1jj/OYfWCh3
      xcgk1A4pLTpN8jbbnSW19YvN4mIIkzod4bNmJJLzDYF23+ow6sfrk/KbUOaQGFBQ
      29j+17aRaEj/eSOkULB0ij9Rw3eHNC8KNvLpjFfRAcj/yznuaRmUJ7WtpUwpIlN8
      3rk1i2wqO+vVHJFP8mm2CLXqGQxMYb//uOKKQxyjML2TcOMUWYrcNE59NP94ukrh
      UaUTxGJMw9seubQhOzDv4apPMK7SyHS7V5pLsPPyZ6gGsk3nSGv5BjDeSlovGdte
      Sg==
      -----END CERTIFICATE-----
    type: tls
  - addresses:
    - host: 34.86.21.173
      port: 9094
    certificates:
    - |
      -----BEGIN CERTIFICATE-----
      MIIDLTCCAhWgAwIBAgIJAKmGATKr+WpLMA0GCSqGSIb3DQEBCwUAMC0xEzARBgNV
      BAoMCmlvLnN0cmltemkxFjAUBgNVBAMMDWNsdXN0ZXItY2EgdjAwHhcNMjAwNzE3
      MTQzNDQwWhcNMjEwNzE3MTQzNDQwWjAtMRMwEQYDVQQKDAppby5zdHJpbXppMRYw
      FAYDVQQDDA1jbHVzdGVyLWNhIHYwMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
      CgKCAQEA0vXECx8jMjPcHpbm/FSXuMogCm/WzGsG9Nu77JouvVypFkTgW6ZElJK1
      sTGdGHijwjpT6yHJVjNxmLuaumWMox5Q1IP4gRqBki1T1p9EH3gfPBIidN4fmBXE
      HjClLOuYOCKHBF0xsulW4+0rv9zlrXf27nEHZx0Q25q2gbko0oI+3vVT7FLRJ4NQ
      1hgDUCsHNdiJMkyxB/lf1UhnQBEEHqXpZZW+TiKD7ZOVYtE/+NGi22K0vNFOHV0N
      XtbP1QzvxIe1F9JCYeTtkuiVchABBEwmrLg2D1zShcn/Sj62spIRLKXFse2wPuai
      T1ln9y34EfKCI/5jCVoA7ZGuGS/8FwIDAQABo1AwTjAdBgNVHQ4EFgQU/O9zRhsc
      DMJil5Yi+rp50LkEVhgwHwYDVR0jBBgwFoAU/O9zRhscDMJil5Yi+rp50LkEVhgw
      DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAodS+QzH0d0agztzAM3JK
      HE2L1i64WlyToA1XvVeuGx61T4Mkok+ADLntsZWsfkWpSjO42sLzT1jj/OYfWCh3
      xcgk1A4pLTpN8jbbnSW19YvN4mIIkzod4bNmJJLzDYF23+ow6sfrk/KbUOaQGFBQ
      29j+17aRaEj/eSOkULB0ij9Rw3eHNC8KNvLpjFfRAcj/yznuaRmUJ7WtpUwpIlN8
      3rk1i2wqO+vVHJFP8mm2CLXqGQxMYb//uOKKQxyjML2TcOMUWYrcNE59NP94ukrh
      UaUTxGJMw9seubQhOzDv4apPMK7SyHS7V5pLsPPyZ6gGsk3nSGv5BjDeSlovGdte
      Sg==
      -----END CERTIFICATE-----
    type: external
  observedGeneration: 3

Need help on this to resolve.

scholzj commented 4 years ago

I think that the connectCluster in your MM2 configuration needs to be the target cluster and not the source cluster. Not sure whether it is or isn't related to the other issues you describe.

vperi1730 commented 4 years ago

Here is the configuration I am using and the connectCluster is pointing to target only.

#Kafka Mirror Maker
kafkammYaml: |-
  apiVersion: kafka.strimzi.io/v1alpha1
  kind: KafkaMirrorMaker2
  metadata:
    name: mirrormaker2-test2
  spec:
    version: 2.4.0 
    replicas: 1
    connectCluster: "my-trgt-cluster"
    clusters:
    - alias: "my-src-cluster"
      authentication:
        certificateAndKey:
          certificate: user.crt
          key: user.key
          secretName: user-src
        type: tls
      bootstrapServers: my-src-cluster-kafka-bootstrap.kafka-mirror-src1.svc:9093
      tls:
        trustedCertificates:
        - certificate: ca.crt
          secretName: my-src-cluster-cluster-ca-cert
    - alias: "my-trgt-cluster"
      authentication:
        certificateAndKey:
          certificate: user.crt
          key: user.key
          secretName: user-target
        type: tls
      bootstrapServers: my-trgt-cluster-kafka-bootstrap.kafka-mirror-trgt1.svc:9093
      config:
        config.storage.replication.factor: 1
        offset.storage.replication.factor: 1
        status.storage.replication.factor: 1

      tls:
        trustedCertificates:
        - certificate: ca.crt
          secretName: my-trgt-cluster-cluster-ca-cert
    mirrors:
    - sourceCluster: "my-src-cluster"
      targetCluster: "my-trgt-cluster"
      sourceConnector:
        config:
          replication.factor: 1
          offset-syncs.topic.replication.factor: 1
          sync.topic.acls.enabled: "true"
          refresh.groups.enabled: "true"
          refresh.topics.enabled: "true"

      heartbeatConnector: 
        config:
          heartbeats.topic.replication.factor: 1 
      checkpointConnector: 
        config:
          checkpoints.topic.replication.factor: 1 
      topicsPattern: ".*"
      groupsPattern: ".*" 
scholzj commented 4 years ago

This resource MM2 CR is completely different from the previous one. I do not think we can provide any help this way.

vperi1730 commented 4 years ago

Sorry, We were deploying this on 4 diff clusters, hence the CR's are different. Let me check the documentation to see if there is anything I can do before I come back to you.

Thanks Scholzj for your help.

vperi1730 commented 4 years ago

Hi Scholzj, Issue is resolved with the following configuration change. 1 question I have is if I have a topic in src with replication as 1/2/3 these 2 params need to updated in the MM2 every time.

For parameter 18 I made it to 2, so all my topics got auto replicated properly earlier it was replicating only as 1 due to the below configuration. Ask is if i have multiple topics with diff replication factor in src does it support wildcard like * or any other value?

sourceConnector: (17)
      config:
        replication.factor: 1 (18)
        offset-syncs.topic.replication.factor: 1 (19)
scholzj commented 4 years ago

I have no idea how they work, sorry.

ajborley commented 4 years ago

@vperi1730 The replication.factor config property defines the initial replication factor for topics that are created by the MM2 MirrorSourceConnector (1), which is useful if you have different numbers of brokers at the target cluster compared to the source cluster. If the topics you want to mirror have various different replication factors you can update them after they have been created by the MirrorSourceConnector (2) or you can create the topics before you start mirroring with replication factors that match those on the source cluster (remember that the mirrored topics include the source cluster alias as a prefix on the topic name, e.g. my-src-cluster.topic1).

The offset-syncs.topic.replication.factor config property sets the replication factor for the mm2-offset-syncs.<target-cluster-alias>.internal topic that is created by the MM2 MirrorSourceConnector (3) on your source cluster. That topic is used to store the offset position that has been reached by the consumer in the MirrorSourceConnector for each mirrored topic-partition. As the offset-syncs topic is created on the source cluster, the offset-syncs.topic.replication.factor property cannot be larger than the number of brokers on the source cluster.

The MM2 MirrorSourceConnector mirrors topic-level configuration settings across from the source cluster to the target cluster (with a default deny list - see (4)). However, replication factor is not a topic-level configuration setting - see the list at (5).

(1) - https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L293)) (2) - You can do this via the kafka-reassign-partitions.sh tool, and (I think) Cruise Control can do this for you too. (3) - https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L281 (4) - https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java#L33-L38 (5) - https://kafka.apache.org/documentation.html#topicconfigs

vperi1730 commented 4 years ago

thanks for the information, this helped.