내가 사용하는 카프카의 버전 확인 /usr/local/kafka/bin/kafka-topics.sh --version
libs 하위 디렉토리에서 kafka의 jar 파일을 이용해 버전 하는 방법 ls -l /usr/local/kafka/libs/kafka_*
버전이 2.6.0 임을 확인
업그레이드 하기 전, 현재 사용중인 버전을 확인하고, 업그레이드하려는 버전을 정한 뒤, 카프나의 릴리즈 노트 등을 확인해보면서, 업그레이드 시 문제가 될 부분을 확인한다.
상위버전은 클라이언트들의 하위 호환성을 가지고 있어서, 대부분 클라이언트 이슈는 없음.
하지만 스칼라 컨슈머, 프로듀서처럼 서비스가 종료된 경우도 있어서 전체적으로 카프카의 릴리즈 노트를 확인해야함
현재 사용중인게 1.x 이거나 0.x 이고, 2.x 버전으로 업그레이드 하려는 경우, 메이저 버전의 변경이 필요한 업그레이드
메세지의 포멧 변경,ㅣ 브로커에서의 기본값 변경, 과거에 지원했던 명령어의 지원 종료, 일부 JMX 메트릭의 변화 등의 문제가 있을 수 있으므로 업그레이드로 인한 이슈 파악이 필요
아파치 카프카 공식 홈페이지 https://kafka.apache.org/20/documentation/#upgrade_200_notable 에서 확인 가능
하지만 같은 2.x 내에서의 업그레이드 경우 마이너 버전 업그레이드이므로 비교적 용이하게 업그레이드 가능
카프카의 버전 업그레이드 방법은 크게 다운타임을 가질수 있는경우, 가질 수 없는 경우로 나뉨.
다운타임을 가질 수 있다면, 현재 버ㅂ전의 카프카를 모두 종류하고, 최신버전의 카프카를 실행하면 끝
다운타임을 가질 수 없다면, 연결되어있는 수많은 파이프라인이나 카프카와 연결된 서비스에 영향을 최소화하도록 해야함.
한 대씩 롤링 업그레이드를 하는 방법이 있음.
8.2 주키퍼 의존성이 있는 카프카 롤링 업그레이드
주키퍼 버전은 올리지 않고 카프카만 2.1에서 2.6으로 올리는 실습을 진행해보겠음
일단 2.6으로 설치되어있는 카프카를 2.1로 낮춰야한다.
낮추기 전에 토픽들을 전부 삭제 /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server peter-kafka01.foo.bar:9092 로 토픽 리스트들을 확인후, __consumer_offsets 빼고 전부 삭제
[ec2-user@ip-172-31-0-207 ~]$ /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --create --topic peter-version2-1 --partitions 1 --replication-factor 3
Exception in thread "main" joptsimple.UnrecognizedOptionException: bootstrap-server is not a recognized option
at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
at joptsimple.OptionParser.parse(OptionParser.java:396)
at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:361)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:44)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
TASK [Gathering Facts] ***
[WARNING]: Platform linux on host peter-zk03.foo.bar is using the discovered Python interpreter at /usr/bin/python, but future installation of another Python interpreter could change this. See
https://docs.ansible.com/ansible/2.9/reference_appendices/interpreter_discovery.html for more information.
ok: [peter-zk03.foo.bar]
TASK [common : Set timezone to Asia/Seoul] ***
ok: [peter-zk03.foo.bar]
TASK [common : install Java and tools] ***
ok: [peter-zk03.foo.bar]
* 현재 설정된 파티션 배치를 먼저 보여주고, 제안하는 파티션 배치를 출력해줌, 그럼 뒤에 제안한 배치의 설정을 복사하고 새로운 move.json 파일을 생성
* `move.json` 파일을 적용
```shell
[ec2-user@ip-172-31-0-34 ~]$ /usr/local/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server peter-kafka01.foo.bar:9092 --reassignment-json-file move.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"peter-scaleout1","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"peter-scaleout1","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"peter-scaleout1","partition":2,"replicas":[3],"log_dirs":["any"]},{"topic":"peter-scaleout1","partition":3,"replicas":[1],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for peter-scaleout1-0,peter-scaleout1-1,peter-scaleout1-2,peter-scaleout1-3
8장
카프카 버전을 올리는 방법을 배운다.
8.1 카프카 버전 업그레이드를 위한 준비
/usr/local/kafka/bin/kafka-topics.sh --version
ls -l /usr/local/kafka/libs/kafka_*
https://kafka.apache.org/20/documentation/#upgrade_200_notable
에서 확인 가능8.2 주키퍼 의존성이 있는 카프카 롤링 업그레이드
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server peter-kafka01.foo.bar:9092
로 토픽 리스트들을 확인후,__consumer_offsets
빼고 전부 삭제/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --delete --topic 토픽명
sudo systemctl stop kafka-server
로 카프카를 모두 종료 (저는 다 dead 상태였어서 restart 해준뒤, stop 다시 해주었습니다.)ansible 서버로 접근 후, 설치
/usr/local/kafka/bin/kafka-topics.sh --zookeeper peter-zk01.foo.bar --create --topic peter-version2-1 --partitions 1 --replication-factor 3
)8.2.1 최신 버전의 카프카 다운로드와 설정
sudo cp kafka_2.12-2.1.0/config/server.properties kafka_2.12-2.6.0/config/server.properties
8.2.2 브로커 버전 업그레이드
브로커 버전 업그레이드는 한대씩 진행
peter-kafka01
에 접근후 브로커 종료종료된 브로커가 갖고 있던 파팃견의 리더들이 다른 브로커로 변경된다. 만약 카프카 클러스터가 운영중 이라면, 클라이언트는 일시적으로 리더를 찾지못하는 에러가 발생 or 타임아웃이 발생한다
이 장애는 당연한걸로, 카프카 클라이언트 내부적으로 재시도 로직이 있으므로, 모든 클라이언트들은 새로운 리더가 있는 브로커를 바라봄
첫번째 종료했지만, 리플리케이션 기능을 통해 장애는 안나고 정상작동남. 2.1버전으로 연결된 2.6 카프카 실행
연결이 2.6으로 변경된걸 확인, 이제 브로커를 실행하면
peter-kafka01
의 버전은 2.6으로 변경되었음. bootstrap-server를 실행해보면 잘 동작하는것을 확인 할 수 있다.2번,3번 도 동일하게 진행
현재 상태가, 1,2,3의 카프카 버전은 2.6으로 올려주었지만 브로커 프로토콜 버전과 메세지 포멧 버전은 2.1인 상태
8.2.3 브로커 설정 변경
위의 변경을 통해 카프카 버전은 2.6이지만 2.1 버전으로 통신하게끔 되어있는 상태이다.
이걸 다시 2.6으로 변경하게끔 vi를 통해 다시 변경해준다.
위 값을 지워도 되고, 2.6으로 변경해도 된다. 변경을 적용하기 위해서는 또 재시작이 필요
변경해주고, 검증단계 필요하므로 콘솔 프로듀서를 통해 메세지 전송
확인
업그레이드 이전의 메세지도 가져올 수 있는지 확인해보면
업그레이드 후의 검증까지 모두 완료했다.
8.2.4 업그레이드 작업시 주의사항
이번 실습은 간단하지만 운영에서 예상치 못한 문제가 생길수 있으니 충분한 테스트를 미리 해둬야 함
운영과 동일한 환경의 개발용 카프카를 미리 구상하고, 개발용에서 업그레이드 수행해보면 조금 더 꼼꼼히 볼수 있음
또한 작업 시간과 관련된 문제인데, 카프카 사용량이 적은 시간대를 골라 업그레이드 작업을 실시하는것을 권장
버전 업그레이드는 서비스 영향 없도록 한대씩 돌아가면서 종료, 업그레이드, 시작 작업을 진행하고, 브로커 종료, 시작이 되면 리더가 변경되고 팔로워들은 리플리케이션이 내부적으로 일어나게 된다.
프로듀서의 ack=1 옵션을 사용하는경우 일부 메세지가 손실될 수도 있음.
8.3 카프카의 확장
운영하며 카프카 사용량이 증가해서 카프카를 확장해야할 상황도 오게되는데 그 경우를 실습해보는 시간
우선 실습용 토픽 하나생성
총 4개의 파티션으로 구성되어있고, 브로커는 1,2,3 이렇게 3대로 구성되어있음 각 브로커마다 하나의 파티션이 배치되었지만 내꺼는 1브로커에 총 2개의 파티션이 배치되어있음(책에는 3번에 2개)
책 내용처럼 perter-zk03 서버를 4번째 브로커로 설정
PLAY [peter-zk03.foo.bar] ****
TASK [Gathering Facts] *** [WARNING]: Platform linux on host peter-zk03.foo.bar is using the discovered Python interpreter at /usr/bin/python, but future installation of another Python interpreter could change this. See https://docs.ansible.com/ansible/2.9/reference_appendices/interpreter_discovery.html for more information. ok: [peter-zk03.foo.bar]
TASK [common : Set timezone to Asia/Seoul] *** ok: [peter-zk03.foo.bar]
TASK [common : install Java and tools] *** ok: [peter-zk03.foo.bar]
TASK [common : copy krb5 conf] *** ok: [peter-zk03.foo.bar]
TASK [stop kafka-server] ***** fatal: [peter-zk03.foo.bar]: FAILED! => {"changed": false, "msg": "Could not find the requested service kafka-server: host"} ...ignoring
TASK [remove directory kafka] **** ok: [peter-zk03.foo.bar]
TASK [make dir kafka] **** changed: [peter-zk03.foo.bar]
TASK [download kafka from web] *** changed: [peter-zk03.foo.bar]
TASK [unarchive kafka] *** changed: [peter-zk03.foo.bar]
TASK [setup link kafka] ** changed: [peter-zk03.foo.bar]
TASK [copy kafka server conf files] ** changed: [peter-zk03.foo.bar]
TASK [copy kafka conf file] ** changed: [peter-zk03.foo.bar] => (item=jmx) changed: [peter-zk03.foo.bar] => (item=connect-distributed.properties)
TASK [copy kafka server in systemd] ** changed: [peter-zk03.foo.bar] => (item=kafka-server.service) changed: [peter-zk03.foo.bar] => (item=kafka-connect.service)
TASK [kafka : just force systemd to reload configs] ** ok: [peter-zk03.foo.bar]
TASK [kafka : make sure a service is running] **** changed: [peter-zk03.foo.bar]
PLAY RECAP *** peter-zk03.foo.bar : ok=15 changed=8 unreachable=0 failed=0 skipped=0 rescued=0 ignored=1
peter-scaleout1
은 이동하지 않은상태. 이걸 이동하기위해서는 브로커 부하 분산을 해줘야 한다8.3.1 브로커 부하 분산
kafka-reassign-partitions.sh
도구를 이용하면 파티션을 이동시킬 수 있다. 이걸 이용해서 실습용 토픽의 파티션을 분산topics
에 추가해주면 가능하다reassign-partitions-topic.json
로 저장해두고kafka-reassign-partitions.sh
명령어를 이용해서 분산시킬 브로커리스트를 지정Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"peter-scaleout1","partition":0,"replicas":[4],"log_dirs":["any"]},{"topic":"peter-scaleout1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"peter-scaleout1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"peter-scaleout1","partition":3,"replicas":[3],"log_dirs":["any"]}]}
describe
를 이용해 잘 배치되었는지 확인하면8.3.2 분산 배치 작업 시 주의사항