strimzi / strimzi-kafka-operator

Apache Kafka® running on Kubernetes
https://strimzi.io/
Apache License 2.0
4.78k stars 1.28k forks source link

KafkaMirrorMaker TLS link #3887

Closed lanzhiwang closed 3 years ago

lanzhiwang commented 3 years ago

I hava two k8s cluster,deploy Kafka in the two clusters separately, I test KafkaMirrorMaker.

in source cluster:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 2.5.0
    replicas: 3
    jmxOptions: {}
    listeners:
      external:
        type: nodeport
        tls: true
        authentication:
          type: tls
    authorization:
      type: simple
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      log.message.format.version: '2.5'
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}

---

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operation: Read
        host: '*'
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operation: Describe
        host: '*'
      - resource:
          type: group
          name: my-group
          patternType: literal
        operation: Read
        host: '*'
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operation: Write
        host: '*'
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operation: Create
        host: '*'
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operation: Describe
        host: '*'

---

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 3
  replicas: 3
  config:
    retention.ms: 604800000
    segment.bytes: 1073741824

in destination cluster:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 2.5.0
    replicas: 3
    jmxOptions: {}
    listeners:
      external:
        type: nodeport
        tls: true
        authentication:
          type: tls
    authorization:
      type: simple
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      log.message.format.version: '2.5'
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}

---

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operation: Read
        host: '*'
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operation: Describe
        host: '*'
      - resource:
          type: group
          name: my-group
          patternType: literal
        operation: Read
        host: '*'
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operation: Write
        host: '*'
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operation: Create
        host: '*'
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operation: Describe
        host: '*'

---

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 3
  replicas: 3
  config:
    retention.ms: 604800000
    segment.bytes: 1073741824

from destination cluster copy secret my-cluster-cluster-ca-cert and my-user to source cluster:

apiVersion: v1
data:
  ca.crt: LS0tLS1CR...
  ca.p12: MIIEVgIB...
  ca.password: Z2w...
kind: Secret
metadata:
  name: destination-my-cluster-cluster-ca-cert
  namespace: kafka

---

apiVersion: v1
data:
  ca.crt: LS0tL...
  user.crt: LS0tLS1...
  user.key: LS0tL...
  user.p12: MIIJOA...
  user.password: U2l...
kind: Secret
metadata:
  name: destination-my-user
  namespace: kafka

in source cluster verify secret:

$ kubectl -n kafka get secret destination-my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt

$ kubectl -n kafka get secret destination-my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d

$ keytool -keystore user-truststore.jks -alias CARoot -import -file ca.crt

$ kubectl -n kafka get secret destination-my-user -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12

$ kubectl -n kafka get secret destination-my-user -o jsonpath='{.data.user\.password}' | base64 -d

$ keytool -importkeystore -srckeystore user.p12 -destkeystore user-keystore.jks -deststoretype pkcs12

$ cat << EOF > client-ssl.properties
security.protocol=SSL
ssl.truststore.location=/root/ssl/user-truststore.jks
ssl.truststore.password=g...
ssl.endpoint.identification.algorithm=

ssl.keystore.location=/root/ssl/user-keystore.jks
ssl.keystore.password=S...
ssl.key.password=S...
EOF

# 10.0.128.237 in destination cluster
$ kafka-topics.sh --bootstrap-server 10.0.128.237:31818 --command-config ./client-ssl.properties --list
my-topic

secret is ok !

in source cluster:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaMirrorMaker
metadata:
  name: my-mirror-maker
spec:
  replicas: 3
  version: 2.5.0
  consumer:
    bootstrapServers: 10.0.128.202:31455
    groupId: my-group
    numStreams: 2
    offsetCommitInterval: 120000
    tls:
      trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
    authentication:
      type: tls
      certificateAndKey:
        secretName: my-user
        certificate: user.crt
        key: user.p12
    config:
      max.poll.records: 100
      receive.buffer.bytes: 32768
  producer:
    bootstrapServers: 10.0.128.237:31818
    abortOnSendFailure: false
    tls:
      trustedCertificates:
      - secretName: destination-my-cluster-cluster-ca-cert
        certificate: ca.crt
    authentication:
      type: tls
      certificateAndKey:
        secretName: destination-my-user
        certificate: user.crt
        key: user.p12
    config:
      compression.type: gzip
      batch.size: 8192
  whitelist: my-topic
  resources:
    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi

in source cluster:

[root@init-int ssl]# kubectl -n kafka get pods
NAME                                               READY   STATUS             RESTARTS   AGE
my-cluster-entity-operator-9d996575b-q8qhz         3/3     Running            0          82m
my-cluster-kafka-0                                 2/2     Running            0          83m
my-cluster-kafka-1                                 2/2     Running            0          83m
my-cluster-kafka-2                                 2/2     Running            0          83m
my-cluster-zookeeper-0                             1/1     Running            0          83m
my-cluster-zookeeper-1                             1/1     Running            0          83m
my-cluster-zookeeper-2                             1/1     Running            0          83m
my-mirror-maker-mirror-maker-8445c44cd6-2lj74      0/1     CrashLoopBackOff   9          25m
my-mirror-maker-mirror-maker-8445c44cd6-dz659      0/1     CrashLoopBackOff   9          25m
my-mirror-maker-mirror-maker-8445c44cd6-wwhhc      0/1     CrashLoopBackOff   9          25m
strimzi-cluster-operator-v0.18.0-797787dd9-4pltw   1/1     Running            0          89m
[root@init-int ssl]#
[root@init-int ssl]# kubectl -n kafka logs my-mirror-maker-mirror-maker-8445c44cd6-2lj74
Preparing truststore
Certificate was added to keystore
Preparing truststore is complete
Preparing keystore
unable to load private key
139865046890384:error:0906D06C:PEM routines:PEM_read_bio:no start line:pem_lib.c:707:Expecting: ANY PRIVATE KEY
Preparing keystore is complete
Preparing truststore
Certificate was added to keystore
Preparing truststore is complete
Preparing keystore
unable to load private key
140373671139216:error:0906D06C:PEM routines:PEM_read_bio:no start line:pem_lib.c:707:Expecting: ANY PRIVATE KEY
Preparing keystore is complete
Kafka Mirror Maker consumer configuration:
# Bootstrap servers
bootstrap.servers=10.0.128.202:31455
# Consumer group
group.id=my-group
# Provided configuration
max.poll.records=100
receive.buffer.bytes=32768

security.protocol=SSL
# TLS / SSL
ssl.truststore.location=/tmp/kafka/consumer.truststore.p12
ssl.truststore.password=[hidden]
ssl.truststore.type=PKCS12
ssl.keystore.location=/tmp/kafka/consumer.keystore.p12
ssl.keystore.password=[hidden]
ssl.keystore.type=PKCS12

Kafka Mirror Maker producer configuration:
# Bootstrap servers
bootstrap.servers=10.0.128.237:31818
# Provided configuration
batch.size=8192
compression.type=gzip

security.protocol=SSL
# TLS / SSL
ssl.truststore.location=/tmp/kafka/producer.truststore.p12
ssl.truststore.password=[hidden]
ssl.truststore.type=PKCS12
ssl.keystore.location=/tmp/kafka/producer.keystore.p12
ssl.keystore.password=[hidden]
ssl.keystore.type=PKCS12

2020-10-28 14:34:34,982 INFO Starting readiness poller (io.strimzi.mirrormaker.agent.MirrorMakerAgent) [main]
2020-10-28 14:34:35,046 INFO Starting liveness poller (io.strimzi.mirrormaker.agent.MirrorMakerAgent) [main]
2020-10-28 14:34:35,270 INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [main]
2020-10-28 14:34:35,471 INFO Starting mirror maker (kafka.tools.MirrorMaker$) [main]
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
2020-10-28 14:34:35,511 INFO ProducerConfig values:
    acks = -1
    batch.size = 8192
    bootstrap.servers = [10.0.128.237:31818]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-1
    compression.type = gzip
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 2147483647
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 9223372036854775807
    max.in.flight.requests.per.connection = 1
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = /tmp/kafka/producer.keystore.p12
    ssl.keystore.password = [hidden]
    ssl.keystore.type = PKCS12
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = /tmp/kafka/producer.truststore.p12
    ssl.truststore.password = [hidden]
    ssl.truststore.type = PKCS12
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
 (org.apache.kafka.clients.producer.ProducerConfig) [main]
2020-10-28 14:34:35,575 INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms. (org.apache.kafka.clients.producer.KafkaProducer) [main]
2020-10-28 14:34:35,577 ERROR Exception when starting mirror maker. (kafka.tools.MirrorMaker$) [main]
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:434)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at kafka.tools.MirrorMaker$MirrorMakerProducer.<init>(MirrorMaker.scala:368)
    at kafka.tools.MirrorMaker$MirrorMakerOptions.checkArgs(MirrorMaker.scala:532)
    at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:86)
    at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/kafka/producer.keystore.p12 of type PKCS12
    at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
    at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:442)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:423)
    ... 5 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/kafka/producer.keystore.p12 of type PKCS12
    at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
    at org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
    at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
    at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
    ... 10 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/kafka/producer.keystore.p12 of type PKCS12
    at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
    at org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144)
    ... 13 more
Caused by: java.io.IOException: Short read of DER length
    at sun.security.util.DerInputStream.getLength(DerInputStream.java:582)
    at sun.security.util.DerValue.init(DerValue.java:391)
    at sun.security.util.DerValue.<init>(DerValue.java:332)
    at sun.security.util.DerValue.<init>(DerValue.java:345)
    at sun.security.pkcs12.PKCS12KeyStore.engineLoad(PKCS12KeyStore.java:1938)
    at java.security.KeyStore.load(KeyStore.java:1445)
    at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:289)
    ... 14 more
Exception in thread "main" java.lang.NullPointerException
    at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:93)
    at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
2020-10-28 14:34:35,582 INFO Start clean shutdown. (kafka.tools.MirrorMaker$) [MirrorMakerShutdownHook]
2020-10-28 14:34:35,582 INFO Shutting down consumer threads. (kafka.tools.MirrorMaker$) [MirrorMakerShutdownHook]
2020-10-28 14:34:35,582 INFO Closing producer. (kafka.tools.MirrorMaker$) [MirrorMakerShutdownHook]
2020-10-28 14:34:35,582 ERROR Uncaught exception in thread 'MirrorMakerShutdownHook': (org.apache.kafka.common.utils.KafkaThread) [MirrorMakerShutdownHook]
java.lang.NullPointerException
    at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:171)
    at kafka.tools.MirrorMaker$MirrorMakerOptions.$anonfun$checkArgs$2(MirrorMaker.scala:518)
    at kafka.utils.Exit$.$anonfun$addShutdownHook$1(Exit.scala:38)
    at java.lang.Thread.run(Thread.java:748)
[root@init-int ssl]#
scholzj commented 3 years ago

The key in the mirror maker configuration is not the p12 file but the key file. So instead of

key: user.p12

you should use

key: user.key
lanzhiwang commented 3 years ago

thanks! authentication is ok .

but I hava new problem. I hava two k8s cluster. source cluster and destination cluster.

now to access the destination cluster from the source cluster in pod。I create services

in source cluster

apiVersion: v1
kind: Service
metadata:
  name: destination-service
spec:
  ports:
  - port: 31818

---

apiVersion: v1
kind: Endpoints
metadata:
  name: destination-service
subsets:
  - addresses:
    - ip: 10.0.128.237
    ports:
    - port: 31818

verify service

# in pod
[kafka@kafka-test ~]$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server destination-service:31818 --command-config ./client-ssl.properties --list
my-topic

service is ok !

I create mirror maker use destination-service:31818

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaMirrorMaker
metadata:
  name: my-mirror-maker
spec:
  replicas: 3
  version: 2.5.0
  consumer:
    bootstrapServers: my-cluster-kafka-bootstrap:9093
    groupId: my-group
    numStreams: 2
    offsetCommitInterval: 120000
    tls:
      trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
    authentication:
      type: tls
      certificateAndKey:
        secretName: my-user
        certificate: user.crt
        key: user.key
    config:
      max.poll.records: 100
      receive.buffer.bytes: 32768
  producer:
    bootstrapServers: destination-service:31818
    abortOnSendFailure: false
    tls:
      trustedCertificates:
      - secretName: destination-my-cluster-cluster-ca-cert
        certificate: ca.crt
    authentication:
      type: tls
      certificateAndKey:
        secretName: destination-my-user
        certificate: user.crt
        key: user.key
    config:
      compression.type: gzip
      batch.size: 8192
  whitelist: my-topic
  resources:
    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi

verify mirror maker in source cluster

[root@init-int ssl]# kubectl -n kafka get services
NAME                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
destination-service           ClusterIP   10.97.12.10      <none>        31818/TCP                    25m
my-cluster-kafka-bootstrap    ClusterIP   10.108.242.216   <none>        9091/TCP,9093/TCP            42m
my-cluster-kafka-brokers      ClusterIP   None             <none>        9091/TCP,9093/TCP,9999/TCP   42m
my-cluster-zookeeper-client   ClusterIP   10.108.20.220    <none>        2181/TCP                     43m
my-cluster-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   43m
[root@init-int ssl]#
[root@init-int ssl]# kubectl -n kafka get deploy
NAME                               READY   UP-TO-DATE   AVAILABLE   AGE
my-cluster-entity-operator         1/1     1            1           42m
my-mirror-maker-mirror-maker       0/3     3            0           17m
strimzi-cluster-operator-v0.18.0   1/1     1            1           169m
[root@init-int ssl]#
[root@init-int ssl]# kubectl -n kafka get pod
NAME                                               READY   STATUS    RESTARTS   AGE
kafka-test                                         1/1     Running   0          23m
my-cluster-entity-operator-9d996575b-mppdg         3/3     Running   0          42m
my-cluster-kafka-0                                 2/2     Running   0          42m
my-cluster-kafka-1                                 2/2     Running   0          42m
my-cluster-kafka-2                                 2/2     Running   0          42m
my-cluster-zookeeper-0                             1/1     Running   0          43m
my-cluster-zookeeper-1                             1/1     Running   0          43m
my-cluster-zookeeper-2                             1/1     Running   0          43m
my-mirror-maker-mirror-maker-7fcf85989f-fhb7h      0/1     Running   0          17m
my-mirror-maker-mirror-maker-7fcf85989f-qsxk4      0/1     Running   0          17m
my-mirror-maker-mirror-maker-7fcf85989f-v9nl5      0/1     Running   0          17m
strimzi-cluster-operator-v0.18.0-797787dd9-4pltw   1/1     Running   0          169m
[root@init-int ssl]#

[root@init-int ssl]# kubectl -n kafka logs my-mirror-maker-mirror-maker-7fcf85989f-fhb7h | more
Preparing truststore
Certificate was added to keystore
Preparing truststore is complete
Preparing keystore
Preparing keystore is complete
Preparing truststore
Certificate was added to keystore
Preparing truststore is complete
Preparing keystore
Preparing keystore is complete
Kafka Mirror Maker consumer configuration:
# Bootstrap servers
bootstrap.servers=my-cluster-kafka-bootstrap:9093
# Consumer group
group.id=my-group
# Provided configuration
max.poll.records=100
receive.buffer.bytes=32768

security.protocol=SSL
# TLS / SSL
ssl.truststore.location=/tmp/kafka/consumer.truststore.p12
ssl.truststore.password=[hidden]
ssl.truststore.type=PKCS12
ssl.keystore.location=/tmp/kafka/consumer.keystore.p12
ssl.keystore.password=[hidden]
ssl.keystore.type=PKCS12

Kafka Mirror Maker producer configuration:
# Bootstrap servers
bootstrap.servers=destination-service:31818
# Provided configuration
batch.size=8192
compression.type=gzip

security.protocol=SSL
# TLS / SSL
ssl.truststore.location=/tmp/kafka/producer.truststore.p12
ssl.truststore.password=[hidden]
ssl.truststore.type=PKCS12
ssl.keystore.location=/tmp/kafka/producer.keystore.p12
ssl.keystore.password=[hidden]
ssl.keystore.type=PKCS12

2020-10-28 15:40:16,271 INFO Starting readiness poller (io.strimzi.mirrormaker.agent.MirrorMakerAgent) [main]
2020-10-28 15:40:16,391 INFO Starting liveness poller (io.strimzi.mirrormaker.agent.MirrorMakerAgent) [main]
2020-10-28 15:40:16,663 INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [main]
2020-10-28 15:40:16,873 INFO Starting mirror maker (kafka.tools.MirrorMaker$) [main]
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you pr
efer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
2020-10-28 15:40:16,940 INFO ProducerConfig values:
    acks = -1
    batch.size = 8192
    bootstrap.servers = [destination-service:31818]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-1
    compression.type = gzip
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 2147483647
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 9223372036854775807
    max.in.flight.requests.per.connection = 1
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = /tmp/kafka/producer.keystore.p12
    ssl.keystore.password = [hidden]
    ssl.keystore.type = PKCS12
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = /tmp/kafka/producer.truststore.p12
    ssl.truststore.password = [hidden]
    ssl.truststore.type = PKCS12
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
 (org.apache.kafka.clients.producer.ProducerConfig) [main]
2020-10-28 15:40:17,201 INFO Kafka version: 2.5.0 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-10-28 15:40:17,201 INFO Kafka commitId: 66563e712b0b9f84 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-10-28 15:40:17,201 INFO Kafka startTimeMs: 1603899617187 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-10-28 15:40:17,212 INFO ConsumerConfig values:
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [my-cluster-kafka-bootstrap:9093]
    check.crcs = true
    client.dns.lookup = default
    client.id = my-group-0
    client.rack =
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = my-group
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 100
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = /tmp/kafka/consumer.keystore.p12
    ssl.keystore.password = [hidden]
    ssl.keystore.type = PKCS12
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = /tmp/kafka/consumer.truststore.p12
    ssl.truststore.password = [hidden]
    ssl.truststore.type = PKCS12
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig) [main]
2020-10-28 15:40:17,359 INFO Kafka version: 2.5.0 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-10-28 15:40:17,359 INFO Kafka commitId: 66563e712b0b9f84 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-10-28 15:40:17,359 INFO Kafka startTimeMs: 1603899617358 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-10-28 15:40:17,359 INFO ConsumerConfig values:
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [my-cluster-kafka-bootstrap:9093]
    check.crcs = true
    client.dns.lookup = default
    client.id = my-group-1
    client.rack =
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = my-group
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 100
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = /tmp/kafka/consumer.keystore.p12
    ssl.keystore.password = [hidden]
    ssl.keystore.type = PKCS12
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = /tmp/kafka/consumer.truststore.p12
    ssl.truststore.password = [hidden]
    ssl.truststore.type = PKCS12
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig) [main]
2020-10-28 15:40:17,454 INFO [Producer clientId=producer-1] Failed authentication with destination-service/10.97.12.10 (SSL handshake failed) (org.apache.kafka.common.network.Selector) [kafka-p
roducer-network-thread | producer-1]
2020-10-28 15:40:17,456 ERROR [Producer clientId=producer-1] Connection to node -1 (destination-service/10.97.12.10:31818) failed authentication due to: SSL handshake failed (org.apache.kafka.c
lients.NetworkClient) [kafka-producer-network-thread | producer-1]
2020-10-28 15:40:17,456 WARN [Producer clientId=producer-1] Bootstrap broker destination-service:31818 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [kafka-producer-
network-thread | producer-1]
2020-10-28 15:40:17,554 INFO Kafka version: 2.5.0 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-10-28 15:40:17,554 INFO Kafka commitId: 66563e712b0b9f84 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-10-28 15:40:17,554 INFO Kafka startTimeMs: 1603899617554 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-10-28 15:40:17,581 INFO [mirrormaker-thread-0] Starting mirror maker thread mirrormaker-thread-0 (kafka.tools.MirrorMaker$MirrorMakerThread) [mirrormaker-thread-0]
2020-10-28 15:40:17,581 INFO [mirrormaker-thread-1] Starting mirror maker thread mirrormaker-thread-1 (kafka.tools.MirrorMaker$MirrorMakerThread) [mirrormaker-thread-1]
2020-10-28 15:40:17,583 INFO [Consumer clientId=my-group-0, groupId=my-group] Subscribed to pattern: 'my-topic' (org.apache.kafka.clients.consumer.KafkaConsumer) [mirrormaker-thread-0]
2020-10-28 15:40:17,583 INFO [Consumer clientId=my-group-1, groupId=my-group] Subscribed to pattern: 'my-topic' (org.apache.kafka.clients.consumer.KafkaConsumer) [mirrormaker-thread-1]
2020-10-28 15:40:17,612 INFO [Producer clientId=producer-1] Failed authentication with destination-service/10.97.12.10 (SSL handshake failed) (org.apache.kafka.common.network.Selector) [kafka-p
roducer-network-thread | producer-1]
2020-10-28 15:40:17,613 ERROR [Producer clientId=producer-1] Connection to node -1 (destination-service/10.97.12.10:31818) failed authentication due to: SSL handshake failed (org.apache.kafka.c
lients.NetworkClient) [kafka-producer-network-thread | producer-1]
2020-10-28 15:40:17,613 WARN [Producer clientId=producer-1] Bootstrap broker destination-service:31818 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [kafka-producer-
network-thread | producer-1]
2020-10-28 15:40:17,870 INFO [Consumer clientId=my-group-1, groupId=my-group] Cluster ID: ABYqrAKRTH63zscunrkBzA (org.apache.kafka.clients.Metadata) [mirrormaker-thread-1]
2020-10-28 15:40:17,870 INFO [Consumer clientId=my-group-0, groupId=my-group] Cluster ID: ABYqrAKRTH63zscunrkBzA (org.apache.kafka.clients.Metadata) [mirrormaker-thread-0]
2020-10-28 15:40:17,877 INFO [Consumer clientId=my-group-0, groupId=my-group] Discovered group coordinator my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9093 (id: 2147483645 rack: null)
 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-0]
2020-10-28 15:40:17,944 INFO [Consumer clientId=my-group-0, groupId=my-group] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-0]
2020-10-28 15:40:17,950 INFO [Consumer clientId=my-group-0, groupId=my-group] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a v
alid member id before actually entering a consumer group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-0]
2020-10-28 15:40:17,950 INFO [Consumer clientId=my-group-0, groupId=my-group] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-0]
2020-10-28 15:40:18,022 INFO [Producer clientId=producer-1] Failed authentication with destination-service/10.97.12.10 (SSL handshake failed) (org.apache.kafka.common.network.Selector) [kafka-p
roducer-network-thread | producer-1]
2020-10-28 15:40:18,022 ERROR [Producer clientId=producer-1] Connection to node -1 (destination-service/10.97.12.10:31818) failed authentication due to: SSL handshake failed (org.apache.kafka.c
lients.NetworkClient) [kafka-producer-network-thread | producer-1]
2020-10-28 15:40:18,022 WARN [Producer clientId=producer-1] Bootstrap broker destination-service:31818 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [kafka-producer-
network-thread | producer-1]
2020-10-28 15:40:18,162 INFO [Consumer clientId=my-group-1, groupId=my-group] Discovered group coordinator my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9093 (id: 2147483645 rack: null)
 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-1]
2020-10-28 15:40:18,166 INFO [Consumer clientId=my-group-1, groupId=my-group] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-1]
2020-10-28 15:40:18,191 INFO [Consumer clientId=my-group-1, groupId=my-group] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a v
alid member id before actually entering a consumer group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-1]
2020-10-28 15:40:18,192 INFO [Consumer clientId=my-group-1, groupId=my-group] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-1]
2020-10-28 15:40:18,931 INFO [Producer clientId=producer-1] Failed authentication with destination-service/10.97.12.10 (SSL handshake failed) (org.apache.kafka.common.network.Selector) [kafka-p
roducer-network-thread | producer-1]
2020-10-28 15:40:18,931 ERROR [Producer clientId=producer-1] Connection to node -1 (destination-service/10.97.12.10:31818) failed authentication due to: SSL handshake failed (org.apache.kafka.c
lients.NetworkClient) [kafka-producer-network-thread | producer-1]
2020-10-28 15:40:18,931 WARN [Producer clientId=producer-1] Bootstrap broker destination-service:31818 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [kafka-producer-
network-thread | producer-1]
2020-10-28 15:40:20,041 INFO [Producer clientId=producer-1] Failed authentication with destination-service/10.97.12.10 (SSL handshake failed) (org.apache.kafka.common.network.Selector) [kafka-p
roducer-network-thread | producer-1]
2020-10-28 15:40:20,041 ERROR [Producer clientId=producer-1] Connection to node -1 (destination-service/10.97.12.10:31818) failed authentication due to: SSL handshake failed (org.apache.kafka.c
lients.NetworkClient) [kafka-producer-network-thread | producer-1]
2020-10-28 15:40:20,041 WARN [Producer clientId=producer-1] Bootstrap broker destination-service:31818 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [kafka-producer-
network-thread | producer-1]
2020-10-28 15:40:21,205 INFO [Producer clientId=producer-1] Failed authentication with destination-service/10.97.12.10 (SSL handshake failed) (org.apache.kafka.common.network.Selector) [kafka-p
roducer-network-thread | producer-1]
2020-10-28 15:40:21,206 ERROR [Producer clientId=producer-1] Connection to node -1 (destination-service/10.97.12.10:31818) failed authentication due to: SSL handshake failed (org.apache.kafka.c
lients.NetworkClient) [kafka-producer-network-thread | producer-1]
2020-10-28 15:40:21,206 WARN [Producer clientId=producer-1] Bootstrap broker destination-service:31818 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [kafka-producer-
--More--
scholzj commented 3 years ago

Well, I'm not sure I understand why would you create the service. You can just specify 10.0.128.237:31818 directly in the bootstrapServer field. When using node port, you will also need to disable TLS hostname verification. You also use my-cluster-kafka-bootstrap:9093 for the consumer. But in the first comment, the cluster did not had the tls interface enabled.

lanzhiwang commented 3 years ago

I found the problem. I am not disable TLS hostname verification. I also fix the source cluster, it tls enable. thanks! close issue.

scholzj commented 3 years ago

Great, glad it works now.