arterhuo / blog

1 stars 1 forks source link

Kafka迁移方案 #14

Open huoarter opened 3 years ago

huoarter commented 3 years ago

官方命令行工具

- 编写需要移动的topic文件
--topics-to-move-json-file 指定JSON格式配置文件 topics.json

{"topics":
    [{"topic":"dialogue"}],
    "version": 1
}
  - 执行分区重分配计划, 可重复执行使用不同的throttle值,来调整速率限制

kafka-reassign-partitions --zookeeper prd-zbka-001:2181/kafka/ka --reassignment-json-file topic-reassignment.json --execute --throttle 50000000 (限速bytes/s,50MB/s)

  - 验证分区重分配计划

kafka-reassign-partitions --zookeeper prd-zbka-001:2181/kafka/ka --reassignment-json-file topic-reassignment.json --verify

  - 此方案优缺点
    - 优点: 简单省事,数据量不大的情况下,亦可限速,比较灵活
    - 缺点:数量大的时候,会导致不同的brokers之间重复的迁移数据,增大延长任务
##kafka加上cruise control可界面操作
  - 加入cruise control, 在界面上可以进行下架broker节点,均衡partitions
  - 此方案优缺点:
    - 优点:有界面操作,且有和broker节点的partitions分布,能够做到brokers之间的partitions均衡
    - 缺点:
         - 1.x之前kafka版本,不支持JBOD技术,单brokers多个磁盘的情况,会造成partitions均衡,但磁盘使用不均衡,差异较大。
         - 界面均衡对于均衡能配置partitions数量的限制,但是不可配置限制速率。单个partitions过大,移动过程中会导致磁盘io被吃满。
{F293176 size=full}

##在官方命令工具基础上,编写脚本,精确计算partitions分布 
  - kafka-topics --zookeeper pre-zbka-003:2181/kafka/ka --describe |grep -i "leader: " > topics_leader.txt
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 73   Leader: 14  Replicas: 14,10 Isr: 10,14
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 74   Leader: 5   Replicas: 5,11  Isr: 11,5
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 75   Leader: 8   Replicas: 8,19  Isr: 19,8
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 76   Leader: 19  Replicas: 19,15 Isr: 15,19
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 77   Leader: 15  Replicas: 15,1  Isr: 1,15
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 78   Leader: 1   Replicas: 1,2   Isr: 1,2
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 79   Leader: 2   Replicas: 2,3   Isr: 2,3
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 80   Leader: 13  Replicas: 13,4  Isr: 4,13
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 81   Leader: 4   Replicas: 4,16  Isr: 4,16
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 82   Leader: 5   Replicas: 5,14  Isr: 14,5
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 83   Leader: 6   Replicas: 6,8   Isr: 8,6
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 84   Leader: 7   Replicas: 7,17  Isr: 7,17
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 85   Leader: 17  Replicas: 17,9  Isr: 9,17
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 86   Leader: 9   Replicas: 9,10  Isr: 9,10
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 87   Leader: 10  Replicas: 10,11 Isr: 10,11
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 88   Leader: 11  Replicas: 11,12 Isr: 12,11
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 89   Leader: 13  Replicas: 13,18 Isr: 18,13
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 90   Leader: 5   Replicas: 5,15  Isr: 15,5
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 91   Leader: 19  Replicas: 19,1  Isr: 1,19
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 92   Leader: 8   Replicas: 8,2   Isr: 8,2
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 93   Leader: 1   Replicas: 1,3   Isr: 1,3
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 94   Leader: 2   Replicas: 2,4   Isr: 4,2
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 95   Leader: 3   Replicas: 3,16  Isr: 3,16
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 96   Leader: 4   Replicas: 4,6   Isr: 4,6
Topic: zb-dialogue-grpc-session-all-in-one  Partition: 97   Leader: 16  Replicas: 16,7  Isr: 7,16
  - 使用remove_node.py进行替换需要下架的remove_nodes为保留的nodes

!/usr/bin/env python

import json import random temp={"version":1,"partitions":[]} remove_nodes=[19,18,17,16] def gen_random_node(slist): node = random.randint(1,15) if node in slist: return gen_random_node(slist) else: return node

with open('topics_leader.txt',"r") as f: result = {} for line in f: line_sp=line.strip().split() topic,partition,broker,reps=line_sp[1],line_sp[3],line_sp[5],line_sp[7] reps_l = [] for j in reps.split(","): reps_l.append(int(j)) for i in range(len(reps_l)): if reps_l[i] in remove_nodes: reps_l[i] = gen_random_node(reps_l) result.setdefault(reps_l[i],0) result[reps_l[i]] += 1 paritions={"topic":topic,"partition":int(partition),"replicas":reps_l} temp['partitions'].append(paritions) print json.dumps(temp)

  - 上述文件输出到文件balance_topics.json,例子

{"version": 1, "partitions": [{"topic": "ItemEvent", "partition": 0, "replicas": [2, 4]}, {"topic": "ItemEvent", "partition": 1, "replicas": [9, 11]}, {"topic": "ItemEvent", "partition": 2, "replicas": [10, 6]}, {"topic": "ItemEvent", "partition": 3, "replicas": [11, 7]}, {"topic": "ItemEvent", "partition": 4, "replicas": [12, 5]}, {"topic": "ItemEvent", "partition": 5, "replicas": [12, 9]}, {"topic": "ItemEvent", "partition": 6, "replicas": [9, 10]}, {"topic": "ItemEvent", "partition": 7, "replicas": [15, 5]}, {"topic": "ItemEvent", "partition": 8, "replicas": [8, 13]}, {"topic": "ItemEventTest", "partition": 0, "replicas": [1]}, {"topic": "ItemEventTest", "partition": 1, "replicas": [3]}, {"topic": "ItemEventTest", "partition": 2, "replicas": [4]}, {"topic": "ItemEventTest", "partition": 3, "replicas": [5]}, {"topic": "ItemEventTest", "partition": 4, "replicas": [6]}, {"topic": "ItemEventTest", "partition": 5, "replicas": [7]}, {"topic": "ItemEventTest", "partition": 6, "replicas": [8]}, {"topic": "ItemEventTest", "partition": 7, "replicas": [9]}, {"topic": "ItemEventTest", "partition": 8, "replicas": [10]}, {"topic": "ItemEventTest", "partition": 9, "replicas": [11]}, {"topic": "ItemEventTest", "partition": 10, "replicas": [12]}, {"topic": "ItemEventTest", "partition": 11, "replicas": [13]}, {"topic": "ItemEventTest", "partition": 12, "replicas": [14]}, {"topic": "ItemEventTest", "partition": 13, "replicas": [15]}, {"topic": "ItemEventTest", "partition": 14, "replicas": [6]}, {"topic": "ItemEventTest", "partition": 15, "replicas": [3]}, {"topic": "ItemEventTest", "partition": 16, "replicas": [5]}]}

  - 对上述balance_topics.json提交任务

kafka-reassign-partitions --zookeeper prd-zbka-003:2181/kafka/ka --reassignment-json-file balance_topics.json --execute --throttle 50000000


  - 此方案优缺点:
    - 优点:partitions分布灵活可控制,精确计算迁移,减少不必要的移动,亦可进行限速。
    - 缺点:partitions过多,不对partitions移动数量限制的话,提交大任务,即使可以限速,依然会对kafka产生比较大的压力,影响性能。且任务提交也不能中止。
  - 方案优化:
   - partitions迁移过程过多的情况下,少量partitions提交。
   - 并通过监控zookeeper /kafka/ka/admin/reassign_partitions path,定时检查,完成启动新任务。
huoarter commented 3 years ago
#!/usr/bin/env python
#kurong.py
import json
import random
temp={"version":1,"partitions":[]}
result={}
with open('topics_leader.txt',"r") as f:
  for line in f:
      line_sp=line.strip().split()
      topic,partition,broker,rep=line_sp[1],line_sp[3],line_sp[5],line_sp[7]
      if len(rep.split(',')) == 2:
          b1,b2=rep.split(',')
          if b1 in result:
              result[b1]+=1
          else:
              result.setdefault(b1,1)
          if b2 in result:
             result[b2]+=1
          else:
             result.setdefault(b2,1)
      elif len(rep.split(',')) == 3:
          b1,b2,b3=rep.split(',')
          if b1 in result:
              result[b1]+=1
          else:
              result.setdefault(b1,1)
          if b2 in result:
             result[b2]+=1
          else:
             result.setdefault(b2,1)
          if b3 in result:
             result[b3]+=1
          else:
             result.setdefault(b3,1)
      else:
        if int(broker) == 15:
            paritions={"topic":topic,"partition":int(partition),"replicas":[int(broker),1]}
        else:
            paritions={"topic":topic,"partition":int(partition),"replicas":[int(broker),int(broker)+1]}

      temp['partitions'].append(paritions)

print json.dumps(temp)