ReactiveKafkaProducerTemplate 를 사용하여 메세지를 송신하고 수동으로 커밋 처리를 하려면 추가 설정이 필요하다.
ReactiveKafkaProducerTemplate 객체를 Bean 으로 등록할때 ProducerConfig.TRANSACTIONAL_ID_CONFIG 값을 설정해야 한다.
이후 reactiveKafkaProducerTemplate.transactionManager() 호출하여 트랜잭션을 수동으로 처리한다.
위와 같이 설정하고 메세지 송신을 해보면 kafka 서버에서 오류가 발생한다.
위 오류가 발생하는 이유는 트랜잭션 상태를 추적하는 토픽(__transaction_state) 이 있는데 이 토픽 복제 팩터는 기본값이 3으로 설정되어 있다. 현재 테스트 환경은 브로커 서버가 1대 이기 때문에 복제 팩터값을 변경하거나 브로커 서버를 3대로 늘려야 한다.
- 그리고 수동으로 커밋 하는 경우 kafka 컨슈머 쪽에도 추가 설정이 필요하다.
- 아래 `--isolation-level read_committed` 옵션을 설정해야 수동으로 커밋이 완료된 경우에만 메세지를 수신한다.
- 해당 옵션을 지정하지 않을 경우 기본값은 `read_uncommitted` 이다.
- 설정이 끝났고 메세지 송신 후 수동으로 commit, abort 테스트를 하면 정상적으로 동작하는 것을 확인 할 수 있다.
- 소스 코드 변경점 [참고](https://github.com/kwonslog/how-to-use-kafka/commit/1f9cb484e147b372e69bc8448eace80eb72f9b88)
### 메세지 수신
- 메세지를 수신하는 것과 커밋 하는 것을 분리해서 생각해야 한다.
- 메세지를 수신하게 되면 해당 토픽의 오프셋 값이 증가한다.
- 이때 커밋을 완료하게 되면 `__consumer_offsets` 토픽에 해당 오프셋 값이 저장된다. 그래서 consumer 가 재실행 되었을 경우에 해당 값을 기준으로 이후의 값들을 재요청 하여 처리한다.
- 예를 들어 메세지를 수신하고 나서 커밋을 하지 않으면 `__consumer_offsets` 토픽 정보에 변경이 발생하지 않는다.
- 다시 커밋을 하도록 하면 변경이 발생하는 것을 확인 할 수 있다.
- 아래 이미지는 `__consumer_offsets` 토픽의 값을 확인한 것이다.
<img width="851" alt="image" src="https://github.com/kwonslog/how-to-use-kafka/assets/65941166/17d489d8-e113-4a81-b3c4-b8fcb611be2c">
- 도커 기준. 아래 명령을 실행하고 수동으로 커밋을 완료하거나 하지 않았을때를 비교해 보면 명확히 알 수 있다.
메세지 송신
test-kafka1: image: confluentinc/cp-kafka container_name: test-kf1 ports:
ID 값은 숫자만 가능
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_MIN_INSYNC_REPLICAS=1
depends_on:
test-kafka-net
test-kafka2: image: confluentinc/cp-kafka container_name: test-kf2 ports:
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_MIN_INSYNC_REPLICAS=1
depends_on:
test-kafka-net
test-kafka3: image: confluentinc/cp-kafka container_name: test-kf3 ports:
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_MIN_INSYNC_REPLICAS=1
depends_on:
networks: test-kafka-net: external: true
docker exec -it test-kf1 kafka-console-consumer --topic test-topic --isolation-level read_committed --bootstrap-server localhost:9092,localhost:9093,localhost:9094
docker exec -it test-kf1 kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager