DevSprout / Practice-Kafka-Dev-To-Production

✉️ 실전 카프카 개발부터 운영까지 스터디
3 stars 0 forks source link

Chapter 05. 프로듀서의 내부 동작 원리와 구현 #5

Open MinJunKweon opened 2 years ago

minkukjo commented 2 years ago

정리

파티셔너

프로듀서의 배치

중복 없는 전송

at-least-once

most-once

exactly-once

카프카의 트랜잭션

MinJunKweon commented 2 years ago

파티셔너(Partitioner)

파티션의 개수가 바뀌면 같은 키로 저장되는 파티션도 바뀐다!

라운드 로빈(Round-Robin) 전략

메시지가 파티셔너를 거쳐 버퍼 메모리에 라운드 로빈으로 분배된다

스티키 파티셔닝(Sticky Partitioning) 전략

하나의 파티션에 최소 레코드 수가 채워질 때까지 전송한다

프로듀서 배치(Batch)

옵션명 설명 기본값
buffer.memory 카프카로 전송하기 위해 담아두는 버퍼 메모리
모든 파티션의 레코드의 크기를 다 더한 값을 계산함

※ 서버로 전달될 수 있는 것보다 빠르게 보낸다면, 프로듀서는 max.block.ms초(기본값 1분)까지 기다렸다가 가용 공간이 없으면 TimeoutException을 발생시킨다
32MB
batch.size 배치 전송을 위한 메시지들을 묶는 단위를 설정하는 배치 크기
각 파티션의 배치 전송용 버퍼 메모리만 계산함
이 값만큼 메시지가 모이면 데이터를 한번에 전송함
16KB
linger.ms 버퍼 메모리에서 대기하는 메시지들의 최대 대기시간
버퍼에 데이터가 많이 쌓이지 않았더라도 이 값만큼 시간이 지나면 무조건 브로커로 데이터를 전송함
0
(배치전송 사용X)
compression.type 프로듀서에 의해 생성되는 모든 데이터에 대한 압축 종류
gzip, snappy, lz4, zstd를 쓸 수 있다
none

배치처리 장점

배치처리 단점

중복 없는 전송

적어도 한 번 전송(at-least-once)

최대 한 번 전송(at-most-once)

중복 없는 전송

중복없는 전송과 관련된 프로듀서 옵션

옵션명 설명 기본값
enable.idempotence 프로듀서가 중복 없는 전송을 허용할지 결정하는 옵션
true로 설정되어야 아래 설정 값들을 사용할 수 있다.
true
max.in.flight.requests.per.connection ACK를 받지 않은 상태에서 하나의 커넥션에서 보낼 수 있는 최대 요청 수 5
acks all, -1 : ISR 전체에 대해 ack를 대기하는 옵션
0 : 메시지 전송후 ACK를 대기하지 않는 옵션
1 : 리더 브로커에만 ACK를 기다리는 옵션
all
retries ACK를 받지 못한 경우 재시도하는 횟수 0

정확히 한 번 전송(exactly-once)

정확히 한 번 전송 디자인

peter-transaction-01::TransactionMetadata(transationalId=peter-transaction-01, producerId=48000, producerEpoch=0, txnTimeoutMs=60000, state=Empty, pendingState=None, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1604742193026)
peter-transaction-01::TransactionMetadata(transationalId=peter-transaction-01, producerId=48000, producerEpoch=0, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(peter-test05-0), txnStartTimestamp=-1, txnLastUpdateTimestamp=1604742193106)
peter-transaction-01::TransactionMetadata(transationalId=peter-transaction-01, producerId=48000, producerEpoch=0, txnTimeoutMs=60000, state=PrepareCommit, pendingState=None, topicPartitions=Set(peter-test05-0), txnStartTimestamp=1604742193106, txnLastUpdateTimestamp=1604742193195)
peter-transaction-01::TransactionMetadata(transationalId=peter-transaction-01, producerId=48000, producerEpoch=0, txnTimeoutMs=60000, state=Empty, pendingState=CompleteCommit, topicPartitions=Set(), txnStartTimestamp=1604742193106, txnLastUpdateTimestamp=1604742193204)

정확히 한 번 전송을 위한 필수 설정 옵션

옵션명 설명 기본값
enable.idempotence 프로듀서가 중복 없는 전송을 허용할지 결정하는 옵션
true로 설정되어야 아래 설정 값들을 사용할 수 있다.
true
max.in.flight.requests.per.connection ACK를 받지 않은 상태에서 하나의 커넥션에서 보낼 수 있는 최대 요청 수 5
acks all, -1 : ISR 전체에 대해 ack를 대기하는 옵션
0 : 메시지 전송후 ACK를 대기하지 않는 옵션
1 : 리더 브로커에만 ACK를 기다리는 옵션
all
retries ACK를 받지 못한 경우 재시도하는 횟수 0
transactional.id 프로듀서 프로세스마다 설정하는 고유한 아이디
트랜잭션을 구분하기 위한 프로듀서 아이디
(ex. peter-transaction-01)
null

정확히 한 번 전송의 단계별 동작

  1. 프로듀서 —> 브로커 FindCoordinatorRequest 전송하여 트랜잭션 코디네이터의 위치를 찾음
    • 만약 존재하지 않는다면 신규 트랜잭션 코디네이터가 생성됨
    • 트랜잭션 코디네이터가 PID(Producer ID)와 TID(transactional.id)를 매핑하고 트랜잭션 전체를 관리함
    • __trasnaction_state 토픽의 파티션 번호는 TID를 기반으로 해시하여 결정됨
    • 이 파티션 리더가 있는 브로커가 트랜잭션 코디네이터의 브로커로 최종 선정됨
  2. initTransactions() 메소드가 호출될 때, InitPidRequest를 트랜잭션 코디네이터로 보냄. 이 때, TID도 함께 전송함
    • 트랜잭션 코디네이터는 TID와 PID를 매핑하고 __transaction_state 트랜잭션 로그에 기록
    • PID epoch를 하나 올리고 이전의 동일한 PID와 이전 에포크에 대한 쓰기 요청을 무시함
    • 에포크를 쓰는 이유는 신뢰성 있는 메시지 전송을 위함 (오래된 메시지를 무시하기 위함)
  3. beginTransaction() 메소드를 사용해 새로운 트랜잭션의 시작을 알림
    • 트랜잭션 코디네이터 관점에서는 트랜잭션 시작 후 첫 번째 레코드 전송이 될 때 트랜잭션이 시작된 것으로 간주
    • 프로듀서는 토픽 파티션 정보를 트랜잭션 코디네이터에 전달함
    • 트랜잭션 코디네이터는 각 트랜잭션 상태의 내용을 기록함 (현재상태 : Ongoing으로 표시)
    • 기본값으로 1분 동안 트랜잭션 상태에 대한 업데이트가 없다면 실패로 간주함
  4. 프로듀서가 브로커로 메시지 전송을 함
    • 메시지 전송 대상 브로커는 트랜잭션 코디네이터와 다른 브로커일 수 있음
    • 메시지에는 PID, 에포크, 시퀀스 번호가 함께 포함되어 전송됨
  5. 트랜잭션 종료를 위해 commitTransaction() 혹은 abortTransaction() 중 반드시 1개를 호출해야함
    • 트랜잭션 코디네이터에 트랜잭션 종료를 알게되고 해당 트랜잭션에 PrepareCommit 혹은 PrepareAbort를 기록함
  6. 트랜잭션 코디네이터는 PrepareCommit 이나 PrepareAbort에 대해서 파티션에 트랜잭션을 커밋이 되었음을 알림 (컨트롤 메시지)
    • 애플리케이션은 이 메시지를 표시하지 않지만 실제로 메시지는 저장되기 때문에 오프셋이 증가함
    • 컨슈머는 이 메시지를 통해 메시지가 제대로 전송되었는지를 판단할 수 있음
    • 트랜잭션 커밋이 끝나지 않은 메시지는 컨슈머에게 반환하지 않음
    • 오프셋 순서 보장을 위해 트랜잭션 성공 또는 실패를 나타내는 LSO(Last Stable Offset)라는 오프셋을 사용함
  7. 트랜잭션이 완료되면 트랜잭션 코디네이터는 완료됨(Committed)라고 트랜잭션 로그에 기록함
    • 트랜잭션이 완료되면 프로듀서에 완료메시지를 전달하고 모든 처리가 마무리됨
    • 트랜잭션을 사용하는 read_committed 설정을 쓰는 컨슈머들은 커밋된 메시지들만 읽을 수 있음
LOG-INFO commented 2 years ago

정리는 다른 분들이 잘 해주셔서 저는 추가로 학습한 내용만 적겠습니다~~

궁금한 점

기타

MinJunKweon commented 2 years ago

https://kafka.apache.org/documentation/#producerconfigs_enable.idempotence

@LOG-INFO 책이 잘못됐어용 버전업 되면서 3.2.0 버전부터 바뀌었답니당

참고 : https://kafka.apache.org/documentation/#upgrade_320_notable

LOG-INFO commented 2 years ago

@MinJunKweon 저도 다시 찾아보니까 3.0.0부터 적용을 하려 했는데 버그가 있어서 사용자가 설정값에 명시해줘야 했고, 버그 패치로 3.0.1, 3.1.1, 3.2.x 부터 기본 적용 되었다고 하네용

책은 2.x 버전 기준이라 false라고 한 듯..!

MinJunKweon commented 2 years ago

스터디에 나왔던 내용 정리

Q. __transaction_state 토픽에 트랜잭션 로그 상태가 1분간 업데이트 안되면 실패로 간주한다고 하는데, 메시지 프로듀싱 될 때마다 타이머가 리셋되는지? Q. 트랜잭션 abort 되었을 때, 토픽에 프로듀싱 되었던 메시지들은 삭제되는가? Q. 압축 전송이 효율이 좋다면서 왜 default로 사용하지 않을까? A. 스터디에서 나온 이유 3가지로 정리

  1. 사용자가 알아야할 것을 줄이고자 단순함을 위해 none을 하는 것
  2. 메시지 종류에 따라 적용하면 좋은 압축 알고리즘이 다르기 때문에 기본값으로 사용할 알고리즘 선택이 어려움
  3. 압축 및 압축 해제에 대한 오버헤드가 기본적으로 존재하기 때문에 상황에 따라 적용할 수 있도록 기본값을 none으로 설정