Closed zking2000 closed 1 month ago
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: "kafka"
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
initContainers:
- name: init-plaintext-external
image: busybox
command: ['/bin/sh', '-c']
args:
- |
HOSTNAME=$(hostname)
case $HOSTNAME in
"kafka-0")
PLAINTEXT_EXTERNAL="192.168.64.28"
;;
"kafka-1")
PLAINTEXT_EXTERNAL="192.168.64.29"
;;
"kafka-2")
PLAINTEXT_EXTERNAL="192.168.64.30"
;;
*)
echo "未知的主机名: $HOSTNAME"
exit 1
;;
esac
echo $PLAINTEXT_EXTERNAL > /config/plaintext_external
volumeMounts:
- name: config
mountPath: /config
containers:
- name: kafka
image: your-kafka-image:tag
env:
- name: PLAINTEXT_EXTERNAL
valueFrom:
configMapKeyRef:
name: kafka-config
key: plaintext_external
volumeMounts:
- name: config
mountPath: /config
volumes:
- name: config
emptyDir: {}
volumeClaimTemplates:
- metadata:
name: kafka-storage
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 1Gi
global
log /dev/log local0 info
log /dev/log local1 notice
chroot /var/lib/haproxy
stats socket /run/haproxy/admin.sock mode 660 level admin expose-fd listeners
stats timeout 30s
user haproxy
group haproxy
daemon
defaults
log global
option httplog
option dontlognull
log-format "%ci:%cp [%t] %ft %b/%s %Tq/%Tw/%Tc/%Tr/%Tt %ST %B %CC %CS %tsc %ac/%fc/%bc/%sc/%rc %sq/%bq %hr %hs %{+Q}r"
mode tcp
timeout connect 5s
timeout client 30s
timeout server 30s
frontend kafka_frontend
bind *:9093
default_backend kafka_backend
option tcplog
backend kafka_backend
balance roundrobin
server kafka1 192.168.64.28:9093
server kafka2 192.168.64.29:9093
server kafka3 192.168.64.30:9093
option tcplog
listen stats
bind *:8404
stats enable
stats uri /stats
stats refresh 10s
global
log /dev/log local0 debug
log /dev/log local1 notice
chroot /var/lib/haproxy
stats socket /run/haproxy/admin.sock mode 660 level admin expose-fd listeners
stats timeout 30s
user haproxy
group haproxy
daemon
defaults
log global
option httplog
option dontlognull
option tcplog
log-format "%ci:%cp [%t] %ft %b/%s %Tw/%Tc/%Tt %B %ts %ac/%fc/%bc/%sc/%rc %sq/%bq"
mode tcp
timeout connect 5s
timeout client 30s
timeout server 30s
frontend kafka_frontend
bind *:9093
default_backend kafka_backend
option tcplog
option logasap
option dontlognull
option log-separate-errors
backend kafka_backend
balance roundrobin
server kafka1 192.168.64.28:9093
server kafka2 192.168.64.29:9093
server kafka3 192.168.64.30:9093
option tcplog
option logasap
option dontlognull
option log-health-checks
listen stats
bind *:8404
stats enable
stats uri /stats
stats refresh 10s
要确保Kafka允许来自HAProxy的连接,您需要在Kafka的配置中进行一些调整。这主要涉及网络设置和安全配置。以下是一些步骤和建议:
listeners
和 advertised.listeners
配置正确。将 your_kafka_ip
替换为Kafka broker的实际IP地址。
防火墙设置: 确保Kafka服务器的防火墙允许来自HAProxy IP的9092端口(或您配置的其他端口)连接。
对于UFW(Ubuntu防火墙),可以使用如下命令:
sudo ufw allow from HAProxy_IP to any port 9092
网络安全组/ACL: 如果您在云环境中运行Kafka,确保网络安全组或访问控制列表(ACL)允许从HAProxy到Kafka的流量。
Kafka ACL:
如果您启用了Kafka的安全功能,需要配置适当的ACL。例如,使用 kafka-acls
工具:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:* --allow-host 'HAProxy_IP' \
--operation Read --operation Write --topic '*'
这将允许来自HAProxy IP的所有操作。根据需要调整权限。
JAAS配置: 如果使用SASL认证,确保HAProxy能够使用正确的凭证连接到Kafka。可能需要在HAProxy端配置适当的SASL机制。
SSL/TLS配置: 如果使用SSL/TLS加密,确保:
测试连接: 在配置完成后,可以使用Kafka命令行工具从HAProxy服务器测试连接:
bin/kafka-console-producer.sh --broker-list HAProxy_IP:9093 --topic test
日志检查:
检查Kafka的服务器日志(通常在 /var/log/kafka/server.log
)以查看连接尝试和可能的错误信息。
网络诊断:
使用 netstat
或 ss
命令检查Kafka是否正在监听正确的接口和端口:
ss -tulnp | grep 9092
考虑内部/外部监听器: 如果HAProxy和Kafka在同一内部网络,可以考虑设置内部和外部监听器,以优化性能和安全性。
确保这些设置正确配置后,HAProxy应该能够成功连接到Kafka集群。如果仍然遇到问题,检查Kafka和HAProxy的日志可能会提供更多线索。同时,确保网络中的所有中间设备(如路由器或交换机)也允许这些连接。