uber / uReplicator

Improvement of Apache Kafka Mirrormaker
Apache License 2.0
917 stars 199 forks source link

Problem in replication from a local kafka to a remote one (with Kerberos auth) #336

Open FirasEclipse opened 3 years ago

FirasEclipse commented 3 years ago

Hello,

I'm trying to use uReplicator between a local kafka and a remote one. I'm running both local kafka and the replicator of the same CentOS machine which has the following OS info.

CentOS Linux 7 (Core)
Linux 3.10.0-862.el7.x86_64

Local kafka has no authentication (PLAINTEXT), but the remote one has Kerberos authentication (SASL_PLAINTEXT). I configured both the producer and the consumer which are located in config.

Producer config

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.producer.ProducerConfig for more details

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
# bootstrap.servers is populated via WorkerInstance
#bootstrap.servers=localhost:9094
bootstrap.servers=10.10.10.1:3000
#client.id=kloak-mirrormaker-test
client.id=clientID

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=async

# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.type=none

# message encoder
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

batch.size=262144
linger.ms=1000
buffer.memory=167772160
max.request.size=31457280
send.buffer.bytes=62914560
max.in.flight.requests.per.connection=5
delivery.timeout.ms=600000
request.timeout.ms=30000

# Kerberos SASL configuration
# ---------------------------
sasl.mechanism=GSSAPI
# Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kira@KIRA.COM";

Consumer config

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.consumer.ConsumerConfig for more details

# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=127.0.0.1:2181
# bootstrap.servers is populated via WorkerInstance
bootstrap.servers=127.0.0.1:9092
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=30000
zookeeper.session.timeout.ms=30000

#consumer group id
group.id=kloak-mirrormaker-test

consumer.id=kloakmms01-sjc1
socket.receive.buffer.bytes=1048576
fetch.message.max.bytes=8388608
queued.max.message.chunks=5
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
#consumer timeout
#consumer.timeout.ms=5000

auto.offset.reset=earliest

uReplicator config and commands

I'm using only a controller with a worker and I tried to run replicator using the following commands:

Start uReplicator Controller

java -Dlog4j.configuration=file:config/tools-log4j.properties -Xms3g -Xmx3g -Xmn512m -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+AlwaysPreTouch -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+DisableExplicitGC -XX:+PrintCommandLineFlags -XX:CMSInitiatingOccupancyFraction=80 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -Xloggc:/tmp/ureplicator-controller/gc-ureplicator-controller.log -server -cp uReplicator-Controller/target/uReplicator-Controller-2.0.1-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.kafka.mirrormaker.controller.ControllerStarter -enableFederated false  -mode auto ackUpToGit false -enableAutoTopicExpansion false -port 9000 -refreshTimeInSeconds 10 -srcKafkaZkPath localhost:2181 -zookeeper localhost:2181 -destKafkaZkPath localhost:2181 -helixClusterName testMirrorMaker

Start uReplicator Worker

java -Dlog4j.configuration=file:config/test-log4j.properties -XX:MaxGCPauseMillis=100 -XX:InitiatingHeapOccupancyPercent=45 -verbose:gc -Xmx1g -Xms1g -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:gc-ureplicator-worker.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9089 -Dcom.sun.management.jmxremote.rmi.port=9089 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -cp uReplicator-Worker-3.0/target/uReplicator-Worker-3.0-2.0.1-SNAPSHOT-jar-with-dependencies.jar com.uber.stream.ureplicator.worker.WorkerStarter -federated_enabled false -consumer_config config/consumer.properties -producer_config config/producer.properties -helix_config config/helix.properties -topic_mappings config/topicmapping.properties

Note

I tested also the commands in the user manual without any change.

Configuration of remote Kerberos

The remote kafka which has Kerberos authentication uses the following configuration:

[libdefaults]
 dns_lookup_kdc = false
 ticket_lifetime = 24h

[pam]
 debug = false
 ticket_lifetime = 36000
 renew_lifetime = 36000
 forwardable = true
 krb4_convert = false

Results and problem:

After about a 40-60 minutes the producer fires an error and stops the replicator worker!

Log of the worker:

You can find log file in the link. Error in uReplicator Worker.txt

Error is:

[2021-03-31 10:48:21,122] ERROR [firas-null-0] Closing producer due to send failure. topic: firas.data.topic, message: Expiring 1 record(s) for firas.data.topic-6: 30009 ms has passed since last attempt plus backoff time (com.uber.stream.ureplicator.worker.DefaultProducer)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for firas.data.topic-6: 30009 ms has passed since last attempt plus backoff time
[2021-03-31 10:48:21,127] INFO [Producer clientId=firas-null-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-03-31 10:48:21,127] WARN [Producer clientId=firas-null-0] Overriding close timeout 9223372036854775807 ms to 0 ms in order to prevent useless blocking due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-03-31 10:48:21,127] INFO [Producer clientId=firas-null-0] Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)

May it be that the producer fails to renew the TGT from Kerberos? How to make it do it with problems?

Notes:

yangy0000 commented 3 years ago

as error log suggest, it looks like uReplicator is having problem connecting to remote Kafka. can you try to write a kafka producer with the same producer config and produce to remote Kafka? just to verify the there is a connectivity problem.

FirasEclipse commented 3 years ago

Hello @yangy0000

I already tested that and now again I copied the producer configuration and tested to produce to the remote kafka using kafka-console-producer.sh and it worked without problem. I got only the following warnings (which I got also for the uReplicator).

[2021-03-31 21:06:26,659] WARN The configuration 'delivery.timeout.ms' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-31 21:06:26,660] WARN The configuration 'producer.type' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)

But I don't think it's related to the error!

Questions:

  1. If the TGT of Kerberos expires, does replicator try to reconnect getting a new TGT?
  2. Is there an API with which we can monitor the count of written messages to a specific topic (which is already defined in the topicmapping.properties)?
yangy0000 commented 3 years ago

1: No, I thought Kafka Producer Client should handle it? FYI: The worker process will shut down itself if the produce request failed. 2: We don't have API to monitor the count. There are two ways might able to get what you want: 1: check ureplicator consumer committed offset in source cluster(it's in zookeeper). uReplicator only commit offset when the messages successfully produced to the destination cluster 2: check messages in the destination cluster.

FirasEclipse commented 3 years ago

Is there a retry configuration so we can change to make the producer try again when it fails?

yangy0000 commented 3 years ago

Since urep is using kafka-clients 1.1.1 , it doesn't have retry. Maybe you could try to override the kafka-clients to 2.2 in uReplicator-worker?

FirasEclipse commented 3 years ago

Can you please guide me on how to switch to a newer kafka-clients version in uReplicator-worker? I mean which configuration should I change?

yangy0000 commented 3 years ago

try to add below depedency into https://github.com/uber/uReplicator/blob/master/uReplicator-Worker-3.0/pom.xml

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.1.4</version>
</dependency>
FirasEclipse commented 3 years ago

I upgraded the used kafka-client to 2.7 and still the same error! I opened a Console-producer (using the same client), for more than a day and I faced no disconnection which I'm facing in the uReplicator.

yangy0000 commented 3 years ago

what's your throughput? what is the error message you get before producer failed?