Open kmg28801 opened 1 year ago
gpt에게 물어봐서 실습코드에서 es = Elasticsearch('http://peter-kafka02.foo.bar:9200')
http
붙이라고 알려줌.
그런데 새로운 오류가 발생
(venv12) [ec2-user@ip-172-31-13-207 ~]$ python kafka2/chapter12/python/consumer_kafka-2_producer_es_v1.py
kafka2/chapter12/python/consumer_kafka-2_producer_es_v1.py:25: DeprecationWarning: AvroConsumer has been deprecated. Use AvroDeserializer instead.
'schema.registry.url': 'http://peter-kafka03.foo.bar:8081'},reader_value_schema=value_schema)
{'name': 'Ernest', 'class': 4}
Traceback (most recent call last):
File "kafka2/chapter12/python/consumer_kafka-2_producer_es_v1.py", line 53, in <module>
es.indices.create(index=index)
File "/home/ec2-user/venv12/lib64/python3.7/site-packages/elasticsearch/_sync/client/utils.py", line 414, in wrapped
return api(*args, **kwargs)
File "/home/ec2-user/venv12/lib64/python3.7/site-packages/elasticsearch/_sync/client/indices.py", line 518, in create
"PUT", __path, params=__query, headers=__headers, body=__body
File "/home/ec2-user/venv12/lib64/python3.7/site-packages/elasticsearch/_sync/client/_base.py", line 390, in perform_request
method, path, params=params, headers=headers, body=body
File "/home/ec2-user/venv12/lib64/python3.7/site-packages/elasticsearch/_sync/client/_base.py", line 337, in perform_request
body=resp_body,
elasticsearch.UnsupportedProductError: The client noticed that the server is not Elasticsearch and we do not support this unknown product
엘라스틱 서치 상태를 확인하니 아까는 green
이었는데 이제 yellow
가 되었음
(venv12) [ec2-user@ip-172-31-13-207 ~]$ curl -X GET 'http://peter-kafka02.foo.bar:9200/_cat/health?v'
epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1694349165 12:32:45 docker-cluster yellow
[ec2-user@ip-172-31-13-207 ~]$ curl -X GET 'http://peter-kafka02.foo.bar:9200/_cat/indices?v'
이걸로 상태를 보니
yellow open students ITNfSd20S9Kv_dKBi47MXA 1 1 0 0 208b 208b
복사개수가 0개
yellow
에서 바뀌지 않음
2.로그를 확인하니 ERROR
로 따로 찍히는건 없음curl -X GET 'http://peter-kafka02.foo.bar:9200/_cat/shards?v'
를 통해 샤드를 확인index shard prirep state docs store ip node
.tasks 0 p STARTED 4 21.3kb 172.31.5.249 ip-172-31-5-249.us-east-2.compute.internal
students 0 p STARTED 0 208b 172.31.5.249 ip-172-31-5-249.us-east-2.compute.internal
students 0 r UNASSIGNED
.apm-agent-configuration 0 p STARTED 0 208b 172.31.5.249 ip-172-31-5-249.us-east-2.compute.internal
.kibana-event-log-7.12.1-000001 0 p STARTED 4 21.8kb 172.31.5.249 ip-172-31-5-249.us-east-2.compute.internal
.apm-custom-link 0 p STARTED 0 208b 172.31.5.249 ip-172-31-5-249.us-east-2.compute.internal
.kibana_task_manager_7.12.1_001 0 p STARTED 9 188.8kb 172.31.5.249 ip-172-31-5-249.us-east-2.compute.internal
.kibana_7.12.1_001 0 p STARTED 11 2.1mb 172.31.5.249 ip-172-31-5-249.us-east-2.compute.internal
.ds-ilm-history-5-2023.09.01-000001 0 p STARTED 172.31.5.249 ip-172-31-5-249.us-east-2.compute.internal
students
가 2개 찍혀있고, UNASSIGNED
로 되어있음....
노드도 1개중 1개로 확인
[ec2-user@ip-172-31-13-207 ~]$ curl -X GET 'http://peter-kafka02.foo.bar:9200/_cat/nodes?v'
ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
172.31.5.249 62 97 10 0.08 0.07 0.08 cdfhilmrstw * ip-172-31-5-249.us-east-2.compute.internal
(venv12) [ec2-user@ip-172-31-13-207 ~]$ python kafka2/chapter12/python/consumer_kafka-2_producer_es_v1.py
kafka2/chapter12/python/consumer_kafka-2_producer_es_v1.py:25: DeprecationWarning: AvroConsumer has been deprecated. Use AvroDeserializer instead.
'schema.registry.url': 'http://peter-kafka03.foo.bar:8081'},reader_value_schema=value_schema)
AvroConsumer error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Subscribed topic not available: src.peter-avro01-kafka1: Broker: Unknown topic or partition"}
^C^X^CTraceback (most recent call last):
File "kafka2/chapter12/python/consumer_kafka-2_producer_es_v1.py", line 34, in
12장 엔터프라이즈 카프카 아키텍처 구성 사례
12.1 엔터프라이즈용 카프카 아키텍처의 개요
엔터프라이즈 환경에서는 장애 복구 등을 위해 하나 이상의 데이터 센터를 운영, 관리하는 경우가 많음.
카프카 클러스터 역시 다수의 데이터 센터에 배치되어 있으므로, 데이터 리플리케이션은 필수.
카프카 간의 데이터 리플리에키션에는 미러 메이커를 사용한다.
왼쪽은 업스트림과 다운 스트림 사이에서 리플리케이션이 필요,
오른쪽은 여러 데이터센터마다 카프카가 존재하고, 중앙에 있는 카프카 클러스터로 합쳐진 데이터를 모으기 위해 리플리케이션을 사용하기도 함
중앙에 모인 데이터는 실시간 처리와 분석을 위해 다른 어플리케이션에 적재가 된다. (예시로 하둡, 엘라스틱서치, S3, HBase가 있음)
위 그림이 지금까지 진행한 내용을 종합한 엔터프라이즈 환경
카프카 클러스터는 총 2개로, 업스트림 클러스터, 다운스트림 카프카가 존재하고, 다운스트림 카프카는 업스트림 카프카를 미러링.
이걸 구현해보자....
12.2 엔터프라이즈용 카프카의 환경 구성
실습을 위해 준비된 서버는 총 6대
기존 peter-kafka01~03 에는 업스트림 카프카, zk01~03 에는 다운스트림 카프카를 구성
주키퍼는 2개의 앙상블을 구성하지 않고 1개의 앙상블만 구성하고, 지노드를 분리해 카프카 클러스터 2개가 하나의 주키퍼 앙상블을 바라보도록 구성
카프카 커넥트가 업스트림이 아닌 다운 스트림과 함께있음.
미러링에서 가장 중요한건 메시지 손실이 없어야 한다는 점
카프카 커넥트가 x 서버에 위치할때 해당 서버와는 로컬 네트워크를 통해 통신을 하므로 데이터 유실이 없음.
만약 업스트림 쪽에 있다면, 프로듀싱하는데에 로컬 통신이라 데이터 유실이 없지만 다운스트림으로 데이터를 보낼때 유실이 발생하면 복구 방법이 없음
하지만 다운스트림쪽에 있고, 데이터를 프로듀싱 할때 네트워크 에러가 난다면, 컨슈머의 오프셋을 장애 전 시점으로만 변경 한다면 일부 메시지의 중복 발행은 일어나지만, 메시지 유실은 없다.
peter-ansible01
서버에 접속하고, 아래 명령어를 실행하여 그람과 같은 환경으로 설치를 한다.ansible-playbook -i hosts site.yml
명령어 실행TASK [stop exporters]
이거 failed 나는데ignoring
처리됨(상관 없나?) 나머진 순조롭게 okansible-playbook -i hosts expoter.yml
TASK [exporterall : stop services]
failed 나는데 ignoring 처리됨ansible-playbook -i hosts monitoring.yml
12.3 엔터프라이즈용 카프카의 운영 실습
프로듀서1은 업스트림 카프카의 실습용 토픽(peter-avro01-kafka1)으로 메세지를 전송
카프카 커넥트의 미러 메이커는
peter-avro01-kafka1
토픽을 다운스트림 카프카로 미러링컨슈머1을 이용해 업스트림 카프카의
peter-avro01-kafka1
토픽을 컨슘컨슈머2를 이용해 다운스트림 카프카의 미러링된 토픽
src.peter-avro01-kafka1
에서 메세지를 읽고, 엘라스틱서지로 전송REST API와 키바나를 이용해 엘라스틱 서치에 메세지가 저장되었는지 확인
스키마를 변경한 후, 다시 메세지를 전송해 최종적으로 엘라스틱까지 변경되 스키마 내용이 적용되었는지 확인
이 과정에 대한 카프카 메트릭은 프로메테우스에 저장되며, 그라파나 대시보드를 이용해 모니터링
12.3.1 CMAK를 이용한 토픽 생성
Cluster
를 누르고,Add Cluster
를 선택하면 클러스터의 상세 정보를 입력하게끔 화면이 바뀜kafka-1
주키퍼 호스트에는peter-zk01.foo.bar:2181,peter-zk02.foo.bar:2181,peter-zk03.foo.bar:2181/kafka3
를 입력Enable JMX Polling
활성화 하고, 저장kafka-2
도 등록을 해주기, 호스트 주소는peter-zk01.foo.bar:2181,peter-zk02.foo.bar:2181,peter-zk03.foo.bar:2181/kafka4
를 입력peter-avro01-kafka1
, 파티션은 1, Replication Factor는 3으로 설정하고 생성12.3.2 카프카 커넥트 설정
curl http://peter-zk01.foo.bar:8083/connectors
를 요청해서 카프카 커넥트가 실행되고 있는지 확인[]
로 출력curl http://peter-zk01.foo.bar:8083/connectors/peter-mirrormaker2/status | python -m json.tool
를 이용해서 카프카 커넥트가 잘 등록되었는지 확인src.peter-avro01-kafka1
로 미러링 된것을 확인 할 수 있다.12.3.3 모니터링 환경 구성
peter-kafka01
서버에 그라파나, 프로메테우스가 설치되어 있으며, 3000번 포트를 이용해 접근할 수 있다.그라파나 대시보드 생성
을 참고해서 프로메테우스 연동Prometheus
, URL을http://peter-kafka01.foo.bar:9090
으로 입력https://github.com/onlybooks/kafka2/blob/main/chapter7/kafka_metrics.json
복붙)1860
load 얘는 에러남...)12.3.4 메세지 전송과 확인
git clone https://github.com/onlybooks/kafka2
sudo yum -y install python3
(kafka2는 이미 git clone 받아져있고, python3도 이미 설치되어있기때문에 저는 스킵했습니다.)python3 -m venv venv12
를 이용해 가상환경 생성하고, 진입confluent-kafka[avro]
,names
,elasticsearch
라이브러리를 설치한다peter-avro01-kafka1
토픽으로 전송curl -X GET 'http://peter-kafka02.foo.bar:9200/_cat/health?v'
(status green)확인src.peter-avro01-kafka1
에서 메세지를 읽고, 엘라스틱 서체로 전송(막힘)AvroConsumer
가 deprecated되었음...