kmg28801 / kafka-study

1 stars 0 forks source link

[실전 카프카 개발부터 운영까지] 11장. 카프카 커넥트 #21

Open kmg28801 opened 1 year ago

kmg28801 commented 12 months ago

카프카 커넥트 주요 장점

카프카 커넥트의 핵심 개념

image image

카프카 커넥트의 내부 동작

image

단독 모드 카프카 커넥트

PLAY [kafkahosts] **

TASK [Gathering Facts] ***** [WARNING]: Platform linux on host peter-kafka03.foo.bar is using the discovered Python interpreter at /usr/bin/python, but future installation of another Python interpreter could change this. See https://docs.ansible.com/ansible/2.9/reference_appendices/interpreter_discovery.html for more information. ok: [peter-kafka03.foo.bar] [WARNING]: Platform linux on host peter-kafka02.foo.bar is using the discovered Python interpreter at /usr/bin/python, but future installation of another Python interpreter could change this. See https://docs.ansible.com/ansible/2.9/reference_appendices/interpreter_discovery.html for more information. ok: [peter-kafka02.foo.bar] [WARNING]: Platform linux on host peter-kafka01.foo.bar is using the discovered Python interpreter at /usr/bin/python, but future installation of another Python interpreter could change this. See https://docs.ansible.com/ansible/2.9/reference_appendices/interpreter_discovery.html for more information. ok: [peter-kafka01.foo.bar]

TASK [common : Set timezone to Asia/Seoul] ***** ok: [peter-kafka03.foo.bar] ok: [peter-kafka01.foo.bar] ok: [peter-kafka02.foo.bar]

TASK [common : install Java and tools] *****

changed: [peter-kafka02.foo.bar] changed: [peter-kafka03.foo.bar] changed: [peter-kafka01.foo.bar]

TASK [common : copy krb5 conf] ***** ok: [peter-kafka03.foo.bar] ok: [peter-kafka01.foo.bar] ok: [peter-kafka02.foo.bar]

TASK [stop kafka-server] *** ok: [peter-kafka01.foo.bar] ok: [peter-kafka03.foo.bar] ok: [peter-kafka02.foo.bar]

TASK [remove directory kafka] ** ok: [peter-kafka01.foo.bar] ok: [peter-kafka03.foo.bar] ok: [peter-kafka02.foo.bar]

TASK [make dir kafka] ** changed: [peter-kafka01.foo.bar] changed: [peter-kafka02.foo.bar] changed: [peter-kafka03.foo.bar]

TASK [download kafka from web] ***** ok: [peter-kafka03.foo.bar] ok: [peter-kafka01.foo.bar] ok: [peter-kafka02.foo.bar]

TASK [unarchive kafka] ***** changed: [peter-kafka02.foo.bar] changed: [peter-kafka03.foo.bar] changed: [peter-kafka01.foo.bar]

TASK [setup link kafka] **** ok: [peter-kafka01.foo.bar] ok: [peter-kafka03.foo.bar] ok: [peter-kafka02.foo.bar]

TASK [copy kafka server conf files] **** changed: [peter-kafka01.foo.bar] changed: [peter-kafka03.foo.bar] changed: [peter-kafka02.foo.bar]

TASK [copy kafka conf file] **** ok: [peter-kafka01.foo.bar] => (item=jmx) ok: [peter-kafka03.foo.bar] => (item=jmx) ok: [peter-kafka02.foo.bar] => (item=jmx) changed: [peter-kafka01.foo.bar] => (item=connect-distributed.properties) changed: [peter-kafka03.foo.bar] => (item=connect-distributed.properties) changed: [peter-kafka02.foo.bar] => (item=connect-distributed.properties)

TASK [copy kafka server in systemd] **** ok: [peter-kafka01.foo.bar] => (item=kafka-server.service) ok: [peter-kafka02.foo.bar] => (item=kafka-server.service) ok: [peter-kafka03.foo.bar] => (item=kafka-server.service) ok: [peter-kafka01.foo.bar] => (item=kafka-connect.service) ok: [peter-kafka02.foo.bar] => (item=kafka-connect.service) ok: [peter-kafka03.foo.bar] => (item=kafka-connect.service)

TASK [kafka : just force systemd to reload configs] **** ok: [peter-kafka01.foo.bar] ok: [peter-kafka03.foo.bar] ok: [peter-kafka02.foo.bar]

TASK [kafka : make sure a service is running] ** changed: [peter-kafka01.foo.bar] changed: [peter-kafka03.foo.bar] changed: [peter-kafka02.foo.bar]

PLAY RECAP ***** peter-kafka01.foo.bar : ok=15 changed=6 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0 peter-kafka02.foo.bar : ok=15 changed=6 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0 peter-kafka03.foo.bar : ok=15 changed=6 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0


<img width="1123" alt="image" src="https://github.com/kmg28801/kafka-study/assets/53685313/b8c093d3-d7c7-40f1-9847-f88bc5fde0ca">

- 파일 소스 커넥터와 싱크 커넥터를 실행해 단독 모드 파일 커넥터 구성도
- 카프카 커넥트 시나리오
  1. 로컬 디렉토리에 `test.txt` 파일 생성 후 `파일 소스 커넥터를 실행`하여, 로컬의 `test.txt` 파일 내용을 읽은 다음 카프카의 `connect-test` 토픽으로 메시지 전송
  2. 파일 싱크 커넥터는 `connect-test` 토픽에서 메시지를 읽은 후 해당 내용을 로컬의 `test.sink.txt` 파일로 저장

- 이 모든 과정은 교재에서 ansible에서 진행되었지만, 카프카 서버에서 해야됨
```cmd
[ec2-user@ip-172-31-2-254 ansible_playbook]$ echo "hello-1" > test.txt
[ec2-user@ip-172-31-2-254 ansible_playbook]$ echo "hello-2" >> test.txt
[ec2-user@ip-172-31-2-254 ansible_playbook]$ echo "hello-3" >> test.txt
[ec2-user@ip-172-31-2-254 ansible_playbook]$ cat test.txt
hello-1
hello-2
hello-3
name=local-file-source # 커넥터에서 식별하는 이름 지정
connector.class=FileStreamSource # 커넥터에서 사용하는 클래스 지정
tasks.max=1 # 실제 작업을 처리하는 태스크의 최대 수 지정
file=/home/ec2-user/test.txt # 파일 소스 커넥터가 읽을 파일 지정
topic=connect-test # 파일 소스 커넥터가 읽은 내용을 전송할 카프카 토픽 지정
bootstrap.servers=localhost:9092 # 브로커 주소 지정
key.converter=org.apache.kafka.connect.json.JsonConverter # 카프카로 데이터를 보내거나 가져올 때 사용할 포맷 지정, 이때 키와 밸류는 각각 지정
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false # 스키마가 포함된 구조 사용 여부 지정
value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets # 재처리 등을 목적으로 오프셋 파일 저장할 경로 지정
offset.flush.interval.ms=10000 # 오프셋 플러시 주기 설정 (ms)

참고 : 카프카 커넥트의 컨버터는 소스에서 카프카로 전송할 때의 직렬화와 카프카에서 싱크로 전송할 때의 역직렬화를 담당 싱크 커넥터의 경우 데이터는 역순으로 처리되며, 카프카의 메시지 형태는 키와 밸류 형태로 이뤄지므로, 컨버터도 키 컨버터와 밸류 컨버터로 나뉨 커넥트에서는 이러한 컨버터를 통해 카프카 데이터를 전송하며, 카프카 내부에서는 데이터 포맷을 표준화된 상태로 처리할 수 있으므로 불필요하게 컨버팅하는 코드를 작성할 필요가 없다.

[ec2-user@ip-172-31-4-136 bin]$ curl http://localhost:8083/connectors/local-file-source | python -m json.tool
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   229  100   229    0     0   1243      0 --:--:-- --:--:-- --:--:--  1237
{
    "config": {
        "connector.class": "FileStreamSource",
        "file": "test.txt",
        "name": "local-file-source",
        "tasks.max": "1",
        "topic": "connect-test"
    },
    "name": "local-file-source",
    "tasks": [
        {
            "connector": "local-file-source",
            "task": 0
        }
    ],
    "type": "source"
}
[ec2-user@ip-172-31-4-136 ~]$ ls
kafka2  node_exporter-1.0.1.linux-386.tar.gz  test.sink.txt  test.txt
[ec2-user@ip-172-31-4-136 ~]$ cat test.sink.txt
hello-1
hello-2
hello-3
hello-4
[ec2-user@ip-172-31-4-136 ~]$ echo "hello-5" >> test.txt
[ec2-user@ip-172-31-4-136 ~]$ cat test.sink.txt
hello-1
hello-2
hello-3
hello-4
hello-5

## 분산 모드 카프카 커넥트

- 운영 환경에서는 단독 모드보다는 분산 모드로 사용하는 것이 안정적인 운영에 도움을 준다.
- 단독 모드와 분산 모드의 가장 중요한 차이점은 메타 정보의 저장소 위치
  - 분산 모드는 메타 정보의 저장소로 `카프카 내부 토픽` 이용
  - 카프카 내부 토픽을 이용하는 바업ㅂ은 컨슈머 그룹들의 오프셋 정보를 `__consumer_offsets` 토픽에 저장하는 방법과 유사
- 카프카 내부 토픽에 저장하게 되면 워커 장애 시에도 유연하게 대응 가능하다.
  - 하나의 워커에 장애가 발생하더라도 남아 있는 모든 워커가 카프카의 내부 토픽으로부터 메타 정보를 얻어갈 수 있다.
  - 카프카 커넥트에서 사용하는 토픽들은 커넥트 운영에서 중요한 정보가 저장되므로, 리플리케이션 팩터 수는 반드시 3으로 설정
  - 카프카 커넥트는 안전한 메타 저장소를 바탕으로 확장성, 장애 허용, 자동 리밸런싱 등 운영에 필요한 필수 기능을 제공

<img width="999" alt="image" src="https://github.com/kmg28801/kafka-study/assets/53685313/dc2a3b1f-be2d-4695-9b63-d191082566db">

- 기존에는 워커3까지 있었는데, 워커4가 추가되었다면, 카프카 커넥트에서는 내부적으로 자동 리밸런싱 동작에 의해 워커2에 있던 커넥터B가 워커4로 이동한다.
- 자동 리밸런싱은 워커들 안에서 태스크와 커넥터가 최대한 균등하게 배치될 수 있게 한다.

<img width="938" alt="image" src="https://github.com/kmg28801/kafka-study/assets/53685313/da20f03b-4b4c-48e8-9cb7-e74900a77212">

- 워커 3이 장애가 발생해 종료하면서, 워커3ㅇ에서 동작 중이던 태스크B1은 워커2로 이동해서 다시 본래의 작업을 처리
- 자동 리밸런싱, 장애 허용 동작을 통한 안정성 확보와 용이한 스케일 아웃을 위해 운영 환경의 카프카 커넥트는 반드시 분산 모드로 구성해야 한다.
- 분산 모드 설정 파일 옵션 확인 명령어

```cmd
[ec2-user@ip-172-31-4-136 ~]$ sudo cat /usr/local/kafka/config/connect-distributed.properties
bootstrap.servers=peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092
group.id=peter-connect-cluster # 분산 모드 그룹 아이디를 지정, 컨슈머 그룹의 그룹 아이디와 동일한 개념
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets # 커넥터들의 오프셋 추적을 위해 저장하는 카프카 내부 토픽명 지정
offset.storage.replication.factor=3 # 커넥터들의 오프셋 추적을 위해 저장하는 카프카 내부 토픽 리플리케이션 팩터 지정
offset.storage.partitions=25 # 커넥터들의 오프셋 추적을 위해 저장하는 카프카 내부 토픽 파티션 수 지정
config.storage.topic=connect-configs # 커넥터들의 설정을 저장하는 카프카 내부 토픽명 지정
config.storage.replication.factor=3 # 커넥터들의 설정을 저장하는 카프카 내부 토픽 리플리케이션 팩터 지정
config.storage.partitions=1 # 커넥터들의 설정을 저장하는 카프카 내부 토픽 파티션 수 지정
status.storage.topic=connect-status # 커넥터들의 상태를 저장하는 카프카 내부 토픽명 지정
status.storage.replication.factor=3 # 커넥터들의 상태를 저장하는 카프카 내부 토픽 리플리케이션 팩터 지정
status.storage.partitions=5 # 커넥터들의 상태를 저장하는 카프카 내부 토픽 파티션 수 지정
[ec2-user@ip-172-31-4-136 ~]$ sudo systemctl start kafka-connect
[ec2-user@ip-172-31-4-136 ~]$ sudo systemctl status kafka-connect
● kafka-connect.service - kafka-connect
   Loaded: loaded (/etc/systemd/system/kafka-connect.service; disabled; vendor preset: disabled)
   Active: active (running) since 토 2023-09-09 10:05:15 KST; 8s ago
 Main PID: 32205 (java)
   CGroup: /system.slice/kafka-connect.service
           └─32205 java -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bi...

 9월 09 10:05:21 ip-172-31-4-136.ap-northeast-2.compute.internal kafka-connect[32205]: WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.r...
 9월 09 10:05:22 ip-172-31-4-136.ap-northeast-2.compute.internal kafka-connect[32205]: Sep 09, 2023 10:05:22 AM org.glassfish.jersey.internal.Errors logErrors
 9월 09 10:05:22 ip-172-31-4-136.ap-northeast-2.compute.internal kafka-connect[32205]: WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation.
 9월 09 10:05:22 ip-172-31-4-136.ap-northeast-2.compute.internal kafka-connect[32205]: WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
 9월 09 10:05:22 ip-172-31-4-136.ap-northeast-2.compute.internal kafka-connect[32205]: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
 9월 09 10:05:22 ip-172-31-4-136.ap-northeast-2.compute.internal kafka-connect[32205]: WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
 9월 09 10:05:22 ip-172-31-4-136.ap-northeast-2.compute.internal kafka-connect[32205]: WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
 9월 09 10:05:22 ip-172-31-4-136.ap-northeast-2.compute.internal kafka-connect[32205]: [2023-09-09 10:05:22,193] INFO Started o.e.j.s.ServletContextHandler@329a1243{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:825)
 9월 09 10:05:22 ip-172-31-4-136.ap-northeast-2.compute.internal kafka-connect[32205]: [2023-09-09 10:05:22,193] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:319)
 9월 09 10:05:22 ip-172-31-4-136.ap-northeast-2.compute.internal kafka-connect[32205]: [2023-09-09 10:05:22,193] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
Hint: Some lines were ellipsized, use -l to show in full.

커넥터 기반의 미러 메이커 2.0

참고 : 업스트림 카프카는 실시간 용도이고, 다운스트림 카프카는 배치를 위한 용도로 사용하는데, 업스트림 카프카에서 다운스트림 카프카로 리플리케이션 구성을 하여 데이터를 전송하기도 한다.

image
- 미러메이커 1.0에서는 미러링하는 대상의 토픽명이 소스 카프카와 타깃 카프카가 동일했다.
- 단방향의 미러링에서는 동일한 토픽명에서 비롯되는 문제가 없지만, 양방향 미러링의 경우 동일한 토픽명으로 인해 무한 루프 또는 순서가 뒤섞이는 경우 발생
- 미러 메이커 2.0은 위 그림처럼 에일리어스를 추가해 서로의 토픽명을 구분짓는다.
[ec2-user@ip-172-31-2-254 ~]$ cd kafka2
[ec2-user@ip-172-31-2-254 kafka2]$ cd chapter2/ansible_playbook
[ec2-user@ip-172-31-2-254 ansible_playbook]$ ansible-playbook -i hosts kafka2.yml