strimzi / strimzi-kafka-operator

Apache Kafka® running on Kubernetes
https://strimzi.io/
Apache License 2.0
4.78k stars 1.28k forks source link

Mirror between two external Kafka clusters using MirrorMaker2 #3210

Closed eazhilan-nagarajan closed 4 years ago

eazhilan-nagarajan commented 4 years ago

Hi Team,

I have some trouble getting the MirrorMaker2 working between two clusters running in two different OpenShift namespaces.

I have read through many of the issues in this project which were quite informatory but I'm still stuck.

MirrorMaker2 yaml:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaMirrorMaker2
metadata:
  name: my-mirrormaker-1
spec:
  version: 2.4.0
  replicas: 3
  resources:
    requests:
      cpu: "500m"
      memory: 2Gi
    limits:
      cpu: "1.5"
      memory: 2Gi
  jvmOptions:
    "-Xmx": "1g"
    "-Xms": "1g"
  connectCluster: "my-cluster-1"
  clusters:
  - alias: "my-cluster-1"
    bootstrapServers: my-cluster-1-kafka-bootstrap-sandbox-ocp.com:443
    tls:
      trustedCertificates:
        - secretName: my-cluster-1-cluster-ca-cert
          certificate: ca.crt
    authentication:
      type: tls
      certificateAndKey:
        secretName: mm-user-source
        certificate: user.crt
        key: user.key
  - alias: "my-cluster-2"
    bootstrapServers: my-cluster-2-kafka-bootstrap-integration-ocp.com:443
    tls:
      trustedCertificates:
        - secretName: integration-my-cluster-2-cluster-ca-cert
          certificate: ca.crt
    authentication:
      type: tls
      certificateAndKey:
        secretName: mm-user-target
        certificate: user.crt
        key: user.key
    consumer.exclude.internal.topics: "false"
    config:
      producer.max.request.size: 15728640
      config.storage.replication.factor: 3
      offset.storage.replication.factor: 3
      status.storage.replication.factor: 3
  mirrors:
  - sourceCluster: "my-cluster-1"
    targetCluster: "my-cluster-2"
    sourceConnector:
      config:
        replication.factor: 3
        offset-syncs.topic.replication.factor: 3
        sync.topic.acls.enabled: "false"
    heartbeatConnector:
      config:
        heartbeats.topic.replication.factor: 3
    checkpointConnector:
      config:
        tasks.max: 1
        checkpoints.topic.replication.factor: 3
      tasksMax: 1
    topicsPattern: ".*"
    groupsPattern: ".*"
    exclude.internal.topics: "false"

Kafka Cluster yaml: (only a snippet of the config is copied here to show you the listeners and authentication)

    spec:
  kafka:
    version: 2.4.0
    replicas: 3
    listeners:
      external:
        type: route
        authentication:
          type: tls
      plain: {}
      tls: 
        authentication:
          type: tls
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 3

PROBLEM DESCRIPTION:

Now I want to mirror data from my-cluster-1 to my-cluster-2 using the external routes in OpenShift.

How I tried? I ran the MirrorMaker in Sandbox namespace where my-cluster-1 is running so things seems to be ok with the SSL authentication between MM and my-cluster-1 (both run in same namespace).

But the problem is when MirrorMaker tries to connect with my-cluster-2 which is running in a different namespace (integration namespace). I see the below errors in MM logs.

2020-06-17 22:26:20,986 WARN [AdminClient clientId=adminclient-1] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager) [kafka-admin-client-thread \| adminclient-1]
--
  | org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
  | Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence violation, 2
  | at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1575)
  | at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:545)
  | at sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:819)
  | at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:783)
  | at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:626)
  | at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:479)
  | at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:340)
  | at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:265)
  | at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:170)
  | at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
  | at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
  | at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:540)
  | at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1196)
  | at java.lang.Thread.run(Thread.java:748)
  | Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence violation, 2
  | at sun.security.ssl.HandshakeStateManager.check(HandshakeStateManager.java:362)
  | at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:199)
  | at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1082)
  | at sun.security.ssl.Handshaker$1.run(Handshaker.java:1015)
  | at sun.security.ssl.Handshaker$1.run(Handshaker.java:1012)
  | at java.security.AccessController.doPrivileged(Native Method)
  | at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1504)
  | at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:402)
  | at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:484)
  | ... 8 more

So in plain words, seeking help in mirroring data across Kafka cluster running in different namespaces/ datacenters with TLS client authentication.

scholzj commented 4 years ago

I guess there might be several issues:

1) The connectCluster should be the target cluster, because the Mirror Maker 2 connectors are source connectors. So in your case it should be connectCluster: my-cluster-2 I guess.

2) The external routes should work in general ... but they might introduce additional costs / latency if it is just between namespaces. So it is up to oyu if you want to use them, just be aware of the possible consequences.

3) I think the rest looks good on paper. But the error suggests that the TLS authentication failed. Can you provide the full log? Maybe that helps? Also, how did you created the users / copied the secrets with the certificates? Could that be a problem?

eazhilan-nagarajan commented 4 years ago

I'm impressed! Thanks for the quick reply.

  1. I just read about having the connect running in the target cluster and I'll change that and give it a try as you suggested.

  2. I ideally wanted to mirror data across Kafka cluster running in two different data centers for High Availability purposes. As a "getting started" attempt, I'm trying to mirror between two namespace within a OpenShift cluster hence using external routes. So my understanding is without using external routes I cannot bring traffic from external applications. Am I wrong with this understanding?

  3. Yes, I did copy the secrets with certificates and created the user. Is this how it's normally done in similar scenarios?

scholzj commented 4 years ago

I just read about having the connect running in the target cluster and I'll change that and give it a try as you suggested.

Ok, give this a try and see if it helps. If not, I will think about what can we try next.

I ideally wanted to mirror data across Kafka cluster running in two different data centers for High Availability purposes. As a "getting started" attempt, I'm trying to mirror between two namespace within a OpenShift cluster hence using external routes. So my understanding is without using external routes I cannot bring traffic from external applications. Am I wrong with this understanding?

Right. For mirroring between two different OpenShift clusters you will need to use routes (or other external listeners). So I guess it should be fine.

Yes, I did copy the secrets with certificates and created the user. Is this how it's normally done in similar scenarios?

I think that sounds ok. The error can be IMHO caused by wrong certificates etc. So it might make sense to double check you copied the right user form right cluster etc. Otherwise it should be fine I guess.

eazhilan-nagarajan commented 4 years ago

Before I try out your suggestions, I have question about copying user and secret with certificates.

In OpenShift cluster 1 the KafkaUser has annotations like the label with Kafka cluster name. So while copying the KafkaUser and secrets into the second OpenShift cluster what should I do with the Kafka cluster name label or other annotations?

What is the significance of those annotations? Should I remove those annotations? Or any sample around copying user with secrets for this purpose would be great help.

Thanks!

scholzj commented 4 years ago

You should not really copy the KafkaUser at all. You should just copy the secret.

eazhilan-nagarajan commented 4 years ago

You should not really copy the KafkaUser at all. You should just copy the secret.

Oh! Sorry, my bad. Got it! Thanks for clarifying. I'll update here after trying out your suggestion.

eazhilan-nagarajan commented 4 years ago

Hi,

I tried deploying MM again with the suggested changes as below,

Thanks for all the suggestions and guidance.

scholzj commented 4 years ago

Glad it helped!