kmg28801 / kafka-study

1 stars 0 forks source link

[실전 카프카 개발부터 운영까지] 5장. 프로듀서의 내부 동작 원리와 구현 #15

Open kmg28801 opened 1 year ago

youngpark17 commented 1 year ago

5장 프로듀서의 내부 동작 원리와 구현

파티셔너

파티셔닝 전략

  1. 라운드 로빈 전략

    • 메시지의 키는 필수가 아니므로 키값이 없다면 라운드 로빈전략을 통해 파티셔닝 됨.
    • 배치 단위로 전송되는 전략을 이용할때, 라운드 로빈을 이용할 경우 매우 비효율적이 됨.
    • ex) 파티션이 100개이고, 배치 전송을 위한 파티션별 최소 레코드 수가 3이라면 배치 전송을 위해 프로듀서의 버퍼메모리에서 300개의 메시지가 쌓여야 첫번째 파티션부터 메시지가 전송됨.
    • 프로듀서의 옵션을 조정해서 특정 시간을 초과하면 카프카로 레코드 전송하도록 가능(delivery.timeout.ms) default 값은 2분(12,000ms), 하지만 이렇게 처리 할 경우 배치와 압축의 효과를 얻지 못하므로 매우 비효율적이 됨.
  2. 스티키 파티셔닝 전략

    • 카프카 2.4버전부터 이용, 하나의 파티션에 레코드 수를 먼저 채움
    • 키가 null이라면 파티션별 최소 전송 레코드 수가 될때까지 파티션을 채움.
    • ex) 파티션이 100개이고, 배치 전송을 위한 파티션별 최소 레코드 수가 3이라면 라운드 로빈과 다르게 3개의 메시지만 쌓여도 첫번째 파티션으로 메시지 전송함.
    • 라운드로빈에 비해, 지연시간이 30프로 이상이 감소, 메시지의 키가 없고 순서가 중요하지 않다면 스티키 파티셔닝 전략 이용.

프로듀서의 배치


중복 없는 전송

기본적으로 retries 값은 0으로 설정되어 있으며, 이 경우 프로듀서는 한 번의 전송 시도를 하고 ACK를 기다립니다. 만약 ACK를 받지 못하면 프로듀서는 해당 메시지를 실패로 처리하고 에러를 반환합니다.

만약 retries 값을 1 이상의 값으로 설정한 경우, 프로듀서는 초기 전송 실패 후 최대 retries 번의 재시도를 시도합니다. 재시도는 일정한 지연(delay) 간격으로 수행됩니다. 그러나 재시도를 통해도 여전히 ACK를 받지 못한다면 해당 메시지는 재시도 횟수를 초과하게 되며, 이 경우 메시지가 유실될 수 있습니다.

메시지의 유실을 방지하려면 다음과 같은 점을 고려해야 합니다:

retries 값을 적절히 설정해야 합니다. 메시지 전송을 안정적으로 보장하기 위해 재시도 횟수를 충분히 설정하고, 네트워크 지연 등을 고려하여 적절한 재시도 간격을 설정해야 합니다.

acks 옵션을 적절히 설정해야 합니다. acks 옵션은 프로듀서가 ACK를 기다리는 방식을 결정합니다. 모든 복제본에 ACK를 받아야하는 경우, 또는 최소한 하나의 복제본에 ACK를 받으면 충분한 경우 등을 설정할 수 있습니다.

메시지의 중요성에 따라 추가적인 보장을 위해 리더 파티션에 메시지를 전송하는 설정인 min.insync.replicas 등을 고려할 수 있습니다.

따라서 메시지의 안전한 전달을 위해서는 retries 값 뿐만 아니라 acks 옵션 및 클러스터 구성과 운영 정책을 종합적으로 고려하여 적절한 설정을 수행해야 합니다.


---

#### 정확히 한 번 전송
- 중복없는 전송은 정확히 한 번 전송의 일부분
- 디자인
    - 트랜잭션 코디네이터: 프로듀서에 의해 전송된 메시지를 관리, 커밋 또는 중단 등을 표시
    - 카프카의 내부 토픽인 __transaction_state에 저장, 카프카의 내부 토픽이지만 파티션 수와 리플리케이션 팩터 수가 존재, 브로커 설정을 통해 관리 
    - ```transaction.state.log.num.partitions=50```, ```transaction.state.log.replication.factor=3``` 트랜잭션 코디네이터가 해당 토픽에 기록
    - 컨트롤 메시지라는 별도의 메시지를 통해 애플리케이션에 정상적으로 커밋된 것인지 또는 실패한 것인지 식별
- 단계별 동작
  - 1. 프로듀서는 브로커에서 FindCoordinator Request를 통해 트랜잭션 코디네이터의 위치를 얻어옴. 트랜잭션 코디네이터는 producer ID와 transactional.id를 매핑하고 해당 트랜잭션 전체를 관리하는 것, 트랜잭션 코디네이터가 없다면 새로 생성됨.
  - 2. ```__transaction_state``` 토픽의 파티션번호는 transactional.id(프로듀서에서 세팅됨)를 기반으로 해시하여 결정. 해당 파티션의 리더가 있는 브로커가 트랜잭션 코디네이터의 브로커로 최종 선정(정확히 하나의 코디네이터만 가지고있게 됨)
  - 3. ```initTransactions()``` 메소드를 이용해 InitPidRequest를 트랜잭션 코디네이터로 보냄. 이때 TID가 설정된 경우에는 TID를 함께 보내고, 트랜잭션 코디네이터는 TID, PID를 매핑하고 해당 정보를 트랜잭션 로그에 기록.
  - 4. 이후 PID 에포크를 한 단계 올리고, 이를 통해 동일 PID와 이전 에포크에 대한 쓰기 요청은 무시됨
  - 5. 프로듀서는 ```beginTransaction()``` 메소드를 통해 새로운 트랜잭션의 시작을 알리게 됨. 첫번째 레코드가 전송될때까지 트랜잭션이 시작된 것은 아님.
  - 6. 프로듀서는 토픽 파티션 정보를 코디네이터에게 전달하고, 코디네이터는 해당 정보를 트랜잭션 로그에 기록함. TID와 P0(만약 파티션0라면)의 정보가 트랜잭션 로그에 기록되며 트랜잭션의 상태는 ```Ongoing```으로 변하게 됨.
  - 7. 프로듀서가 메시지를 토픽의 파티션으로 전송하면, PID에포크와 시퀀스 번호가 함께 포함되어 전송됨.(**트랜잭션 코디네이가 존재하지않는** 브로커에 메시지 전송)
  - 8. 메시지 전송 후 ```Transaction()```, ```abortTransaction()```을 통해 트랜잭션 코디네이터에게 성공, 실패를 알리고 트랜잭션 코디네이터는 트랜잭션로그에 prepareCommit, PrepareAbort를 기록.
  - 9. 이후 사용자 토픽에 컨트롤 메시지를 통해 커밋 표시를 기록한다.
  - 10. 트랜잭션 코디네이터는 commited라고 로그에 기록하고, 프로듀서에게 해당 트랜잭션이 완료됨을 알림

- gpt 대답.

내부적으로 트랜잭션 코디네이터(Transaction Coordinator)와 함께 카프카 프로듀서의 트랜잭션 동작을 설명하겠습니다:

트랜잭션 시작(Transaction Begin): 프로듀서는 트랜잭션을 시작하려면 beginTransaction() 메서드를 호출합니다. 이때, 프로듀서는 트랜잭션 ID를 생성하고 트랜잭션 코디네이터에게 해당 트랜잭션의 시작을 알립니다.

메시지 전송(Send Messages): 프로듀서는 트랜잭션 내에서 보내고자 하는 메시지를 일반적인 방법으로 전송합니다. 이때, 트랜잭션 ID를 메시지에 포함하여 트랜잭션 코디네이터가 해당 메시지를 트랜잭션에 속한 것으로 인식할 수 있도록 합니다.

ACK 확인 및 트랜잭션 코디네이터와 통신: 프로듀서는 메시지의 전송 결과인 ACK(응답 확인)를 받게 되면, 해당 ACK 정보를 트랜잭션 코디네이터에게 보고합니다. 이를 통해 트랜잭션 코디네이터는 프로듀서의 트랜잭션 진행 상태를 추적할 수 있습니다.

커밋 또는 롤백(Commit or Rollback): 트랜잭션의 모든 메시지 전송이 완료되면, 프로듀서는 commitTransaction() 메서드를 호출하여 트랜잭션을 커밋하거나, abortTransaction() 메서드를 호출하여 트랜잭션을 롤백합니다. 이때, 트랜잭션 코디네이터에게 해당 트랜잭션의 커밋 또는 롤백을 알리게 됩니다.

youngpark17 commented 1 year ago

실습 코드

vi /home/ec2-user/producer.config
enable.idempotence=true
max.in.flight.requests.per.connection=5
retries=5
akcs=all
/user/local/kafka/bin/kafka-console-producer.sh --bootstrap-server peter-kafka01.foo.bar:9092 --topic peter-test04 --producer.config /home/ec2-user/producer.config
cd /data/kafka-logs/peter-test04-0
ls

예제

/user/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --create --topic peter-test05 --partitions 1 --replication-factor 3
sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version

cd kafka2/chapter5/
java -jar ExactlyOnceProducer.jar

/user/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --list

vi /home/ec2-user/consumer.config
exclude.internal.topics=false

/user/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server peter-kafka01.foo.bar:9092 --topic __transaction_state --consumer.config /home/ec2-user/consumer.config --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" --from-beginning

/user/local/kafka/bin/kafka-dump-log.sh --print-data-log--files /data/kafka-logs/peter-test05-0/00000000000000000000.log