Open hubtwork opened 1 month ago
Keyword
Producer ID
+ Sequence Number
-> 고유한 식별자Producer restart
멱등적 프로듀서
기능이 활성화 되어 있는 경우, 프로듀서 초기화 과정에서 브로커로부터 Producer ID
를 생성트랜잭션
기능이 비활성화 되어 있는 경우, 매번 새로운 Producer ID
가 생성멱등적 프로듀서
기능이 활성화 되어 있어도 트랜잭션
기능이 비활성화 되어 있으면 매번 새로운 고유 식별자를 가지기 때문에 중복 메시지를 인식하지 못함Broker failure
멱등적 프로듀서
는 프로듀서의 내부 로직으로 인한 재시도가 발생하는 경우의 중복만 방지producer.send()
를 두 번 호출해도 멱등적 프로듀서
가 개입하지 않아 중복 메시지 발생enable.idempotence=true
설정acks=all
되어 있는 경우에는 퍼포먼스에 차이가 없음Reprocessing caused by application crashes
Reprocessing caused by zombie applications
consuming
, processing
, producing
이 원자적으로 이루어짐을 의미
atomic multipartition writes
을 제공: 결과와 오프셋 커밋 둘 다 파티션에 메시지를 쓰는 것으로, 결과는 output topic에 쓰여지고 offset은 _consumer_offsets
topic에 쓰여짐atomic multipartition writes
을 쓰기 위해 트랜잭션 프로듀서를 사용
transactional.id
로 구성되며 initTransactions()
로 초기화된 Kafka 프로듀서transacional.id
를 통해 초기화 간 동일 Producer를 식별initTransaction()
이 호출될 때 transactional.id
와 연결된 epoch 번호를 증가transactional.id
는 동일한데 epoch가 더 낮은 Producer의 전송, 커밋, 요청을 중단시키는 경우 FencedProducer
에러를 발생read_committed
모드에서는 메시지가 순서대로 처리되도록 보장하기 위함Kafka 트랜잭션이 정확히 한 번 처리 보장을 제공하지 않는 시나리오: Side effects while stream processing
Reading from a Kafka topic and writing to a database
Reading data from a database, writing to Kafka, and from there writing to another database
Copying data from one Kafka cluster to another
Publish/subscribe pattern
read_committed
모드를 사용하면 미커밋된 트랜잭션은 Consumer에게 보이지 않지만, 완전한 처리 보장을 하지는 못함가장 일반적인 방법은 Kafka Streams에서 exactly-once guarantees
를 활성화 하는 것
processing.guarantee
설정을 exactly_once
또는 exactly_once_beta
로 지정하여 사용
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); 1
producer = new KafkaProducer<>(producerProps);
Properties consumerProps = new Properties(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 2 consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); 3
consumer = new KafkaConsumer<>(consumerProps);
producer.initTransactions(); 4
consumer.subscribe(Collections.singleton(inputTopic)); 5
while (true) { try { ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200)); if (records.count() > 0) { producer.beginTransaction(); 6 for (ConsumerRecord<Integer, String> record : records) { ProducerRecord<Integer, String> customizedRecord = transform(record); 7 producer.send(customizedRecord); } Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets(); producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); 8 producer.commitTransaction(); 9 } } catch (ProducerFencedException|InvalidProducerEpochException e) { 10 throw new KafkaException(String.format( "The transactional.id %s is used by another process", transactionalId)); } catch (KafkaException e) { producer.abortTransaction(); 11 resetToLastCommittedPositions(consumer); }}
### 8.2.6 Transactional IDs and Fencing
- `Transactional ID`를 적절하게 설정하는 것은 좀비 프로듀서를 차단하고 정확히 한 번 처리 보장을 유지하는 데 중요함
- `Transactional ID`는 애플리케이션 인스턴스 간에 중복되지 않고, 재시작 시에도 일관되게 유지되어야 함
- Kafka 2.5에서는 KIP-447이 도입되어 Consumer Group 메타데이터를 사용해 좀비를 차단하는 방법이 추가
- 트랜잭션에 Consumer Group 정보를 포함시켜, 새로 생성된 Consumer가 이전 세대의 좀비 Producer를 구분하고 차단할 수 있도록 함
- 동일한 `Transactional ID`를 반복적으로 사용해야 하는 낭비를 줄이고, 좀비 Producer 문제를 더 효과적으로 해결
### 8.2.7 How Transactions Work
- `commit marker` 메시지를 사용해 여러 파티션에 걸친 트랜잭션을 커밋하거나 중단:
1. 트랜잭션이 진행 중임을 기록하고, 포함된 파티션을 로그에 남김
2. 커밋 또는 중단 의도를 로그에 기록
3. 모든 파티션에 트랜잭션 마커를 기록
4. 트랜잭션 완료를 로그에 기록
- 내부 토픽인 `__transaction_state`를 사용
- 트랜잭션 동작 단계:
1. i`nitTransaction()`
- Producer는 트랜잭션을 시작하기 전에 `initTransaction()`을 호출해 트랜잭션 코디네이터에 등록
- 기존 `Transactional ID`의 경우 epoch를 증가시켜 좀비 Producer를 차단
2. `beginTransaction()`
- 트랜잭션이 시작되었다는 신호를 Producer에게 전송
- Producer가 새로운 파티션에 데이터를 보낼 때, 해당 파티션이 트랜잭션에 포함되었다는 정보를 브로커에 전달하고 이는 트랜잭션 로그에 기록
3. `sendOffsetsToTransaction()`
- Producer는 처리한 레코드의 오프셋을 트랜잭션 코디네이터에 보내고, 이는 Consumer Group ID와 함께 커밋
4. `commitTransaction()` 또는 `abortTransaction()`
- 트랜잭션 커밋 또는 중단 요청을 트랜잭션 코디네이터에 보내고, 코디네이터는 이를 트랜잭션 로그에 기록한 후 관련 파티션에 commit marker를 작성
트랜잭션이 시간 내에 커밋되지 않으면, 코디네이터가 자동으로 트랜잭션을 중단
---
소감: 이해하기는 길고 복잡했으나 사용은 쉬워보인다. 다양한 적용 예시를 찾아보면 어떤 상황에서 이 설정과 개념을 인지하고 사용해야 하는지 더 잘 와 닿을 것 같다.
정확히 한 번, 이 챕터의 제목을 보고 생각해봤다. 시스템에서 정확하다는 말을 쓸 수 있을까? “정확하다”는 말을 쓴다는 것은 상대적이다. 시스템의 타임스탬프, 하드웨어 오류, 메모리 오염, 네트워크 문제 등 많은 요소들이 정확성을 저해하기 떄문이다. 결국 시스템의 설계 안에는 이러한 가능성을 최소화하고 변인통제 가능한 수준으로 끌어올려 신뢰성을 확보하는 것이라고 생각이 들었다. 카프카도 본인 진영의 애플리케이션의 신뢰를 주기 위한 방법들을 제안하고 있다. 씹떡 다됐다. 만류귀종이 떠오른다.
"정확히 한번"의 처리를 보장하기 위한 기술을 이야기했지만 그냥 발행 구독 패턴에서 정확히 한번에 대한 처리는 멱등과 원자를 함께 챙길 수 있는 Transaction Outbox Pattern를 접목시켜도 되지 않나? 역시 모르니까 용감하다. ㅋㅋ 파이프라인이 다소 복잡하고 특수한 경우도 분명 존재하겠지만 아직 카프카를 실무에서 다뤄보지 못한 나로썬, 그냥 그런갑다로 밖에 생각이 안든다. 요원하다.
결국 중복 메시지 발행에 대한 문제를 해결하는 건데 카프카의 신뢰성을 알기 위해선 카프카를 더 잘 알아야하는데, 음 쉽지 않다. 카프카도 JPA와 비슷하지 않을까? 멱등한 처리를 카프카에게 온전히 위임하지 않고 "멱등성 보장" 에 대한 자료를 찾고 애플리케이션에 녹여보면 어떨까?
'최소 한 번'을 보장하여 데이터 유실을 방지하고 신뢰성을 가져갔다면, '정확히 한 번'에서는 더 정교하게 이벤트를 보장하기 위한 노력이 엿보였다. 멱등적 프로듀서와 트랜잭션을 통해 많은 상황에서 신뢰를 보장 하더라도 한계가 있음을 잊지 않아야겠다. 이번 챕터는 이전 챕터들에 비해 쉽게 읽히긴 했지만 그렇다고 이해가 쉽지는 않았다.
당장 회사에서는 트래픽이 얼마 되지 않아 의미가 없을 수 있겠지만, '최소 한 번'과 '정확히 한 번'을 보장한다면 아주 조금이라도 네트워크 I/O 비용을 절감할 수 있지 않을까 생각해봤다.
ps. 유니크 식별자, 멱등, 트랜잭션, 범위 등... 익숙한 용어에 쉽게 읽혔는데, 머리로는 이해가 어려웠다. 😢
Kafka 의 transactional producer
기능을 Spring kafka 에서 아래와 같이 설정하면, Spring Transaction 을 통해 발행한 메세지에 마킹하는 기능을 적용할 수 있다.
@Bean
fun producerFactory(): DefaultKafkaProducerFactory<String, String> {
return DefaultKafkaProducerFactory<>(producerConfigs())
.apply {
setTransactionIdPrefix("tx-")
}
}
호출 시에 이벤트를 발행하는 것은 똑같으나 Spring Transaction
이 만약 실패하였다면, abort
정보를 메세지에 마킹하여 트랜잭션이 실패했다는 것을 마킹하는 방식이다. ( 성공할 경우, commit
을 마킹한다 ) 이에 책에서 설명한 것처럼 컨슈머가 isolation.level = read_committed
로 추가 설정되어 있다면, 커밋 처리된 메세지만 읽도록 하여 원자성을 보장하는 방식이다.
근데, 나라면 이미 "메세지" 가 발행된 시점부터 망했다고 본다. 왜? 읽는 쪽에서 알맞게 구현하지 않으면 안되는 구조 자체가 잘못된 개발 방식이라고 생각한다. 읽는 쪽에서는, 당연히 발행된 메세지에 대해서는 "처리해야 할 데이터" 라고 간주하는 것이 올바른 방향이라고 생각하기 때문.
그렇기 때문에 올바른 접근 방식은 @TransactionalEventListener(phase = AFTER_COMMIT)
과 같이 발행자 측에서 트랜잭션의 원자성을 보장하는 것이다. 그런데 이런 경우에서 또한 문제가 발생할 수 있는데 메세지가 중복으로 발행되거나 발행되지 않았을 경우이다. 이를 위해 우리는 프로듀서 측면에서는 TransactionalOutboxPattern
과 같은 방식을 적용해 대응해볼 수 있고 , 컨슈머 측면에서는 내가 해당 메세지를 "특정 시간 내에 처리한 이력이 있는 경우" (messageId
를 고유값으로 간주함) 재처리하지 않도록 컨슈머를 구현하는 방식을 활용해볼 수 있다. [ 이 때, processed
테이블과 같이 메세지 핸들링 이력을 관리하거나 redis TTL
을 이용해 메세지 ID 를 기록하는 방식을 활용해 볼 수 있다. ]
디비지움(Debezium) 은 이런 측면에서, 아웃박스 패턴을 통해 릴레이 서비스가 테이블 업데이트 내역을 카프카에 메세지로 쓰는 방식을 활용한다.
사실 카프카 스트림즈를 나는 현재 사용하고 있지 않고, Application 에서 직접적으로 이를 다뤄볼 일이 많이 있으려나 라는 생각이 있어 exactly_once
설정에 관한 글은 그렇구나 하고 읽었는데, 결국 이 또한 앞서 말한, 혹은 책에서 말한 여러가지 이유 때문에 멱등성 보장은 뚫릴 수 있는 방패라고 생각된다.
2.5 버전부터 보강된
producer fencing
기능을 통해 좀비 프로듀서 문제를 해결하는 것 자체에는 공감한다.
또한 가벼운 작업으로 취급되어야 할 이벤트 발행 이 2PC 기반의 Tx 동기화를 위해 프로듀서 작업에 부하를 만드는 것 자체도 매우 큰 낭비라고 생각된다.
애플리케이션 개발자가 멱등성을 중요시하는 작업을 구현해야 할 때에는, 카프카의 멱등성 보존 기능이 아닌 멱등하게 동작하는 로직 구현에 집중해야한다고 생각한다. 결국 중요한 것은 Producing Logic / Consuming Logic 을 개발자가 얼마나 잘 구성하느냐를 통해 멱등성을 어떻게 달성할 것이냐인 것 같다.
카프카가 얼마나 정교하고 안정적인 시스템인지 다시 한번 깨달았다. 프로듀서부터 브로커, 컨슈머까지 모든 데이터 흐름이 트랜잭션 단위로 처리된다는 점이 인상적이었다. 그런데 메시지를 소비하는 쪽에서 제대로 된 처리를 안하면..... 흠..... 카프카는 양날의 검 같다...
Chapter Ownership : @iamzin