xuanyuanaosheng / xuanyuanaosheng.github.io

个人网站:灵梦缘
https://xuanyuanaosheng.github.io/
0 stars 0 forks source link

中间件之Kafka #30

Open xuanyuanaosheng opened 3 hours ago

xuanyuanaosheng commented 3 hours ago

架构

企业微信截图_17296544576083

安装

一般Kafka集群是由zookeeper集群和kafka集群组成。Kafka集群一般需要3个节点。

安装zookeeper集群

## 创建安装目录
# mkdir /opt/zookeeper

## 下载对应的软件解压
# wget http://mirror.metrocast.net/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5.tar.gz
# tar zxf apache-zookeeper-3.5.5.tar.gz
# mv zookeeper-3.5.5 to /opt/zookeeper

##
# cd /opt/zookeeper/zookeeper-3.5.5/conf
# cp zoo_sample.cfg zoo.cfg
# vim zoo.cfg
--------------------------------------------------
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/home/zkdata
dataLogDir=/opt/zookeeper/home/zkdatalog
clientPort=2181

server.1=10.xx.xx.xx:2888:3888
server.2=10.xx.xx.xx:2888:3888
server.3=10.xx.xx.xx:2888:3888
--------------------------------------------------

## 修改myid
### server1 在节点1上执行
# echo "1" > /opt/zookeeper/home/zkdata/myid

### server2 在节点2上执行
# echo "2" > /opt/zookeeper/home/zkdata/myid

### server3 在节点3上执行
# echo "3" > /opt/zookeeper/home/zkdata/myid

### 安装配置JDK,一般默认yum安装就可以
# echo $JAVA_HOME

### 修改用户权限
# chown -R zookeeper:zookeeper /opt/zookeeper

### 启动zookeeper cluster
# cd /opt/zookeeper/bin
# ./zk-start

### 查询zk集群的状态
# ./zk-status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

安装kafka集群

## 创建安装目录
# mkdir /opt/kafka

## 下载对应的软件解压
# wget http://apache.mirrors.tds.net/kafka/2.3.0/kafka_2.12-2.3.0.tgz 
# tar zxf kafka_2.12-2.3.0.tgz
# mv kafka_2.12-2.3.0 to /opt/kafka

##  配置kafka: /opt/kafka/kafka_2.12-2.3.0/config/server.properties
------------------------------------------------------------------------
broker.id=0                                                #Not the same as other servers, example 0, 1, 2
delete.topic.enable=true
listeners=PLAINTEXT://server1:9092                         #local kafka server ip and port
advertised.listeners=PLAINTEXT://server1:9092              #local kafka server ip and port
queued.max.requests=1000
num.network.threads=16
num.io.threads=16
message.max.bytes=5000000
replica.fetch.max.bytes=10000000
fetch.message.max.bytes=10000000
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/home
num.partitions=12
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=server1:2181,server2:2181,server3:2181    #kafka cluster server ip and port
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
------------------------------------------------------------------------

### 在其他两个节点的配置,需要修改下面3个参数
-----------------------------------------------------------------------
broker.id

listeners=PLAINTEXT

advertised.listeners=PLAINTEXT
----------------------------------------------------------------------

### 如果需要使用ACL 认证,请修改下面的配置,在此之前,必须先创建admin账号:
#  kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=xxxxxxxx],SCRAM-SHA-512=[password=xxxxxxxx]' --entity-type users --entity-name admin

# 修改对应的配置
---------------------------------------------------------------------
listeners=SASL_PLAINTEXT://server1:9092                       #local kafka server ip and port, pay attention replace to SASL_PLAINTEXT
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
advertised.listeners=SASL_PLAINTEXT://server1:9092            #local kafka server ip and port, pay attention replace to SASL_PLAINTEXT
allow.everyone.if.no.acl.found=false
super.users=User:admin                                        #ACL admin user
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
---------------------------------------------------------------------

### 启动kafka集群
# cd /opt/kafka/bin
# ./kafka-start

### 检查kafka集群状态
# jps
31651 QuorumPeerMain
3747 Jps
23511 Kafka
1243 ProdServerStart 

安装管理软件

通过上面的步骤,Kafka集群已经安装完毕,接下来根据情况安装对应的管理软件

## 创建安装目录
# mkdir /opt/kafka-manager

## 下载对应的软件解压
# wget https://github.com/yahoo/CMAK/releases/download/3.0.0.6/cmak-3.0.0.6.zip
# cd /opt/kafka-manager
# unzip cmak-3.0.0.6.zip

## 配置kafka-manager的配置文件:/opt/kafka-manager/conf/application.conf
-------------------------------------------------------------------------------------------
play.crypto.secret="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
play.crypto.secret=${?APPLICATION_SECRET}
play.http.session.maxAge="1h"
play.i18n.langs=["en"]
play.http.requestHandler = "play.http.DefaultHttpRequestHandler"
play.http.context = "/"
play.application.loader=loader.KafkaManagerLoader
kafka-manager.zkhosts="server1:2181,server2:2181,server3:2181"
pinned-dispatcher.type="PinnedDispatcher"
pinned-dispatcher.executor="thread-pool-executor"
application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
}
akka.logger-startup-timeout = 60s
basicAuthentication.enabled=false   #enable basic authentication
basicAuthentication.enabled=${?KAFKA_MANAGER_AUTH_ENABLED}
basicAuthentication.ldap.enabled=false
basicAuthentication.ldap.enabled=${?KAFKA_MANAGER_LDAP_ENABLED}
basicAuthentication.ldap.server=""
basicAuthentication.ldap.server=${?KAFKA_MANAGER_LDAP_SERVER}
basicAuthentication.ldap.port=389
basicAuthentication.ldap.port=${?KAFKA_MANAGER_LDAP_PORT}
basicAuthentication.ldap.username=""
basicAuthentication.ldap.username=${?KAFKA_MANAGER_LDAP_USERNAME}
basicAuthentication.ldap.password=""
basicAuthentication.ldap.password=${?KAFKA_MANAGER_LDAP_PASSWORD}
basicAuthentication.ldap.search-base-dn=""
basicAuthentication.ldap.search-base-dn=${?KAFKA_MANAGER_LDAP_SEARCH_BASE_DN}
basicAuthentication.ldap.search-filter="(uid=$capturedLogin$)"
basicAuthentication.ldap.search-filter=${?KAFKA_MANAGER_LDAP_SEARCH_FILTER}
basicAuthentication.ldap.connection-pool-size=10
basicAuthentication.ldap.connection-pool-size=${?KAFKA_MANAGER_LDAP_CONNECTION_POOL_SIZE}
basicAuthentication.ldap.ssl=false
basicAuthentication.ldap.ssl=${?KAFKA_MANAGER_LDAP_SSL}
basicAuthentication.username="xxxxxxxx"
basicAuthentication.username=${?KAFKA_MANAGER_USERNAME}
basicAuthentication.password="xxxxxxxx"
basicAuthentication.password=${?KAFKA_MANAGER_PASSWORD}
basicAuthentication.realm="Kafka-Manager"
kafka-manager.consumer.properties.file=${?CONSUMER_PROPERTIES_FILE}
-------------------------------------------------------------------------------------------

### If you want to enable basic authentication, enable basicAuthentication.enabled=true under conf/application.conf and modify the below username and password .

### 配置kafka-manager启动文件: /opt/kafka-manager/bin/kafka-manager-start
----------------------------------------------------------------------------------------------
nohup /opt/kafka-manager/bin/kafka-manager [-Dhttp.port=xxxx ](https://kdb.homecredit.cn/display/ITOPS/-Dhttp.port=xxxx%C2%A0) > /opt/kafka-manager/logs/nohup.log 2>&1 &
----------------------------------------------------------------------------------------------

### 启动
# ./kafka-manager-start

### 测试
## 通过web访问: http://server1:port/
## 试着添加对应的cluster 以及新建topic
## The number of partitions is an integer multiple of Brokers

Kafka集群常用的命令

### 对于Zookeeper组件
### start/stop service
# /opt/zookeeper/bin/zk-start   #start service

# /opt/zookeeper/bin/zk-stop   #stop service

### health check

# ps -ef | grep zookeeper              #process check

# /opt/zookeeper/bin/zk-status  #service check

--------------------------------------------------------------------------------

### 对于Kafka组件
### start/stop service
# /opt/kafka/bin/kafka-start    #start service
# /opt/kafka/bin/kafka-stop    #stop service

### health check
# ps -ef | grep kafka                  #process check

## login to kafka manager url to check kafka status

----------------------------------------------------------------------------------

### 对于Kafka manager组件
### start/stop service
# /opt/kafka-manager/bin/kafka-manager-start    #start service
# /opt/kafka-manager/bin/kafka-manager-stop    #stop service

### health check
### login to kafka manager url to check

---------------------------------------------------------------------------------

### Kafka with ACL
### 获取集群中的资源信息
# /kafka-configs.sh --zookeeper [zookeeper_ip|zookeeper_hostname]:zookeeper_port --describe --entity-type [users|topics|clients|brokers]

### 创建新用户
# ./kafka-configs.sh --zookeeper [zookeeper_ip|zookeeper_hostname]:zookeeper_port --alter --add-config 'SCRAM-SHA-256=[password=passwd],SCRAM-SHA-512=[password=passwd]' --entity-type users --entity-name user_name

### 同时授予以下两个权限, 该用户则具有动态创建topic的权限
## 增加对集群的创建权限
# ./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=[zookeeper_ip|zookeeper_hostname]:zookeeper_port --add --allow-principal User:user_name --operation Create --cluster

## 为指定用户增加所有topic的读写权限, 最好指定topic名,但用户若要求由程序动态创建topic, 则需要设置为*
# kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=[zookeeper_ip|zookeeper_hostname]:zookeeper_port --add --allow-principal User:user_name --operation ALL --topic "*"

## 收回指定用户的读写权限
# kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=[zookeeper_ip|zookeeper_hostname]:zookeeper_port --remove --allow-principal User:user_name --operation Read --operation Write --topic topic_name

## 为指定用户的消费者组授予读写权限
# kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=[zookeeper_ip|zookeeper_hostname]:zookeeper_port --add --allow-principal User:user_name --operation All --group '*'

## 列出所有的ACL权限
# kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=[zookeeper_ip|zookeeper_hostname]:zookeeper_port --list

## 创建topic
### replication-factor 副本数, partitions 分区数, retention.ms 数据保留时间(delete策略), cleanup.policy 清理策略 compact或delete, delete.retention.ms 压缩日志(compact策略)的保留时间
# kafka-topics.sh --create --zookeeper [zookeeper_ip|zookeeper_hostname]:zookeeper_port --replication-factor 2 --partitions 8 --config retention.ms=172800000 --config cleanup.policy=delete --config delete.retention.ms=43200000 --topic topic_name

### 删除topic
# kafka-topics.sh --delete --zookeeper [zookeeper_ip|zookeeper_hostname]:zookeeper_port --topic topic_name

Kafka的监控

Kafka集群的升级

更新kafka只需更新二进制包,配置方面无需更改。

Kafka的备份

由于 Kafka 不存储对数据完整性要求很高的数据,所以只需备份 Kafka 和 Zookeeper 的配置文件。

Kafka集群的灾备切换

image

image

image

这两个 Kafka 集群会向配置中心请求 DCleader 和 DCFollower 的状态,并处理领导者和追随者的变动。 image

  1. 生产者

    • 只能向DCLeader写入数据。
    • 向DCFollower写入数据会返回错误。
    • 从配置中心切换到DCLeader后,与Kafka集群的连接将断开,并将使用新的DCLeader集群引导。
  2. 消费者

    • 优先访问Kafka
    • 偏移量存储在一个地方。
    • 可以在任何地方选择消费,灵活迁移。
xuanyuanaosheng commented 2 hours ago

Kafka ACL Authorizer using SASL/SCRAM

Kafka 支持 SCRAM-SHA-256 和 SCRAM-SHA-512,可以与 TLS 一起使用以执行安全认证。用户名作为被认证的主体用于 ACL 等的配置。Kafka 中的默认 SCRAM 实现将 SCRAM 凭据存储在 Zookeeper 中,适用于 Zookeeper 位于私有网络的 Kafka 安装。

  1. 创建凭据 Kafka中的SCRAM实现使用Zookeeper作为凭据存储。可以通过kafka-configs.sh在Zookeeper中创建凭据。对于每个启用的SCRAM机制,必须通过添加带有机制名称的配置来创建凭据。在启动Kafka代理之前,必须创建用于代理之间通信的凭据。客户端凭据可以动态创建和更新,并且更新后的凭据将用于验证新连接。
    
    ## Create broker user (or admin user)
    # kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=xxxxxxxx],SCRAM-SHA-512=[password=xxxxxxxx]' --entity-type users --entity-name admin

Create client user apptest

kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=xxxxxxxx],SCRAM-SHA-512=[password=xxxxxxxx]' --entity-type users --entity-name apptest

Check SCRAM credentials

kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name apptest

Delete SCRAM credentials

kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name apptest


2.  配置 Kafka 代理

1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:

cd /opt/kafka/kafka_2.12-2.3.0/config

vim kafka_server_jaas.conf


KafkaServer {

org.apache.kafka.common.security.scram.ScramLoginModule required

username="admin"

password="xxxxxxxx";

};

2. Modify bin/kafka-server-start.sh

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf kafka.Kafka "$@"

3. Modify config/server.properties


listeners=SASL_PLAINTEXT://server1:9092

security.inter.broker.protocol=SASL_PLAINTEXT

sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256

sasl.enabled.mechanisms=SCRAM-SHA-256

advertised.listeners=SASL_PLAINTEXT://server1:9092

allow.everyone.if.no.acl.found=false

super.users=User:admin

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

4. Restart zookeeper and kafka cluster


### 配置ACL

增加ACL权

Grant apptest user write access to topic testkafka

kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:apptest --operation Write --topic testkafka

Grant apptest user read access to topic testkafka

kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:apptest --operation Read --topic testkafka


列出ACL

kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list


删除ACL

delete write access

kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:apptest --operation Write --topic testkafka


3.  查看和删除用户

查看用户

/opt/zookeeper/zookeeper-3.5.5/bin/zkCli.sh ls /config/users

删除用户

/opt/zookeeper/zookeeper-3.5.5/bin/zkCli.sh delete /config/users/apptest

xuanyuanaosheng commented 1 hour ago

常用的工具

  1. https://github.com/Bronya0/Kafka-King
  2. https://github.com/provectus/kafka-ui