Open skarltjr opened 2 years ago
카프카에서 메세지를 구분하는 단위
- 이 메세지가 어떤 메세지인지 구분
- ex) 주문용 토픽(주문용 이벤트) , 뉴스용 토픽~. 등
1. 프로듀서는 클러스터에게 이 메세지를 어떤 토픽에 저장해줘~ 요청
- 프로듀서는 클러스터에게 이 메세지(이벤트를) 주문 토픽!에 저장해줘~
2. 컨슈머는 어떤 토픽에서 메세지를 읽어올래
- 주문 토픽에서 메세지를 읽어올래
토픽을 기준으로 소통
한 개의 토픽은 한 개 이상의 파티션으로 구성
- 파티션은 메세지를 저장하는 물리적인 파일
- 파티션은 추가만 가능한(append-only) 파일
- 각 메세지 저장 위치를 오프셋 offset이라고한다
- 프로듀서가 넣은 메세지는 파티션의 맨 뒤에 추가⭐️
- 컨슈머는 오프셋 기준으로 메세지를 순서대로 읽는다
- 예를 들어 컨슈머가 3번 오프셋부터 메세지를 읽을게 =>. . . 3번 앞에는 읽지 못한다. & 3번 이후의 메세지를 순서대로 읽는다
- 메세지는 삭제되지 않음(설정에 따라 일정 시간이 지난 뒤 삭제)
- 파티션은 파일이기때문에 메세지를 컨슈머가 읽든 읽지않든 사라지지않는다.
마지막. 메세지는 삭제되지 않음 -> 이벤트 유실의 걱정을 줄일 수 있는 장점이라고 생각
프로듀서는 어떤 파티션에 메세지를 저장할까?
1. 라운드로빈으로 돌아가면서 여러 파티션에 저장하거나
2. 키를 통해 특정 파티션 선택하여 저장한다.
- 프로듀서가 메세지를 저장할 때 토픽의 이름뿐만아니라 키도 지정할 수 있는데
- 키의 해시값을 이용해 저장할 파티션을 선택
- 같은 키에 대해서는 메세지 순서가 유지
- 컨슈머는 컨슈머 그룹에 속한다
- 한 개 파티션은 컨슈머 그룹의 한 개 컨슈머만 연결 가능
- 즉 컨슈머그룹에 속한 컨슈머들은 한 파티션을 공유할 수 없다
- 이를 통해 메세지는 자기자신을 처리할 수 있는 컨슈머에게만 소비된다고 생각
- 한 컨슈머그룹 기준으로 파티션의 메세지는 순서대로 처리
- 하나의 컨슈머그룹 속 각 컨슈머는 각각의 파티션의 메세지를 혼자 처리하며 공유할 수 없기때문
위는 프로듀서를 생성하여 메세지를 send하는 코드다
properties를 활용하여 설정정보를 생성한다.
properties에 기반하여 kafka producer를 생성한다.
producer record = 전송할 메세지
produce record 생성 후 send
producer.close()
1. serializer를 통해 메세지를 바이트 배열로변환
partinioner를 통해 메세지가 어떤 토픽인지, 어떤 파티션에 위치할지 결정
메세지 모음
메세지를 브로커(카프카 클러스터에) 전달
참고로 sender는 별도의 스레드로 동작하는데 따라서 프로듀서의 흐름이 단일 스레드로 동작하지 않으므로 메세지를 producer에게 send하는 동작은 이 메세지를 producer가 브로커에게 전달하는 동작과 의존성이 낮다
### 처리량 관련 주요 속성
- <img width="1175" alt="스크린샷 2022-03-30 오전 12 07 34" src="https://user-images.githubusercontent.com/62214428/160643635-9c3bc8f2-f614-44cf-b291-02e1252d694e.png">
batch size 카프카는 메세지를 배치에 모아서 한 번에 전달
linger.ms : 전송 대기 시간
전송 결과 확인은 결국 블로킹으로 동작하여 그 동안 제어권이 없어 멈춰있어야하니 처리량 저하
그래서 처리량 낮아도 되는 경우에만 사용한다고한다.
callback 매서드를 전달
블로킹 방식이 아니라 처리량 저하 없음
Exception 객체를 전달받으면 이는 실패한것
이에따른 처리 필요
ack = 0
중요한 메세지는 사용하면 안된다.
ack = 1 파티션 리더에만 저장되면 응답받는다.
ack = all 가장 중요한 메세지는 이방법 사용
### 전송 실패 대응 1. 재시도
재시도
재시도 위치
프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 알아서 재전송 시도
send() 매서드에서 예외 발생시 예외타입에 따라 send() 재호출처리 가능
콜백 매서드에서 Exception객체를 전달받으면 타입에 따라 send()재호출
다만. 무한 재호출은 피해라
위 그림과 같은 경우를 생각해보자
프로듀서는 메세지를 전달
브로커에선 메세지를 잘 저장했지만 그 응답.ack의 전송이 늦어짐
프로듀서입장에선 ack이 일정시간안에 도착하지 않았으니 재전송.
결국 브로커는 이전 메세지를 잘 저장했는데 동일 메세지를 또 받음
참고 : enable.idempotance 설정 참고 - 중복전송 가능성을 줄일 수 있다고한다.
컨슈머 - 특정 토픽의 파티션에서 메세지를 조회하자.
properties로 설정을한다
컨슈머 그룹 단위로 파티션 할당
현재 위 그림에서 볼 수 있듯이 topic1 - p0 & topic1 - p1 즉 파티션이 p0,p1 두 개
그런데 마지막은 이 파티션보다 컨슈머가 더 많은데 이 경우(파티션수 < 컨슈머수)에는 컨슈머가 놀게된다- 주의하자
### 커밋과 오프셋
- <img width="884" alt="스크린샷 2022-03-30 오전 1 13 41" src="https://user-images.githubusercontent.com/62214428/160657588-50bd30cf-30b6-45ce-9d67-251872d29cfa.png">
컨슈머가 데이터를 가져오면 마지막으로 가져온 오프셋을 커밋!! 한다. 이 후 이 오프셋을 기억하고 다음 Poll 매서드를 실행하면 앞서 커밋한 오프셋 이후부터 읽어온다. 다시 마지막 읽은 오프셋을 커밋. 반복
### 커밋된 오프셋이 없는 경우
처음 접근하는 경우는 당연히 커밋된 오프셋이없는데
auto.offset.reset 설정 사용
조회에 영향을 주는 주요 설정
fetch.min.bytes : 조회시 브로커가 전송할 최소 데이터크기
fetch.max.wait.ms : 데이터가 최소 크기가 될 때까지 기다릴 시간
max.partition.fetch.bytes : 파티션 당 서버(브로커)가 리턴할 수 있는 최대 크기
컨슈머가 데이터를 읽고 마지막. 오프셋을 커밋한다고했다.
enable.auto.commit설정
- true : 일정 주기로 컨슈머가 읽은 오프셋을 커밋
- false : 수동으로 커밋실행
auto.commit.interval.ms : 자동 커밋 주기 / 바로 위에서 자동으로 오프셋 커밋설정을했을때
- 몇 초마다 자동 커밋이 실행되게할것이냐
- 기본값 : 5초 / 5000ms
poll(), close() 매서드가 호출될 때! 자동 커밋이 실행된다.
⭐️카프카를 사용할 때 주의해야할점 중 하나는 컨슈머는 동일한 메세지를 읽어올수도 있다는것
동일 메세지 조회 가능성
- 일시적 커밋 실패 -> 이 경우 이전에 읽은곳이 커밋되지않아 오프셋 변경이 일어나지 않았기때문에
컨슈머는 멱등성을 고려해야한다.
- 즉 몇번이든 동일 요청은 동일 결과를 배출한다.
- 예를들어 조회(=조회수 1증가) - 좋아요클릭 - 조회가 수행되었는데
- 동일 메세지를 조회하는 문제가 발생할 경우 조회수가 2가아닌 4가되는 문제가 발생할 수 있다
이를 막기 위해 타임스탬프, 일련 번호등을 데이터에 사용하여 중복 처리를 해결해나가야한다
무한 루프를 돌면서 계속 데이터를 poll해야하는 경우가있을 수 있다.
이 때 종료처리를 어떻게하냐
위 wakeUp()매서드를 제외한 모든 카프카 매서드는 스레드에 안전하지 않다
따라서 wakeUp매서드를 제외한 다른 모든 매서드는 여러 스레드에서 동시에 호출하지 않도록한다.
⭐️ 메세지 브로커는 이벤트 브로커 역할을 할 수 없지만
⭐️ 이벤트 브로커는 메세지 브로커 역할을 할 수 있다.
메세지 브로커는 여러 서비스를 연결하는 미들웨어로 중간다리 역할을한다.
메세지 큐에 메세지를 보내고 이를 받아 서로간 통신을한다.
⭐️ 가장 큰 특징은 메세지 브로커는 메세지를 받아 처리하면 해당 메세지는 삭제된다.
⭐️ 반면 이벤트 브로커는 업무상 필요한 시간동안 데이터를 보관할 수 있다.
`이벤트` - 서비스에서 나오는 이벤트를 마치 하나의 정보로 다루고 이를 데이터베이스에 저장하듯이 `저장`의 개념으로 보자
-> 이 점에서 다른 일반적인 메세지큐와 카프카의 차이점을 느낄 수 있었다.
-> 왜 사용할까? 만약 해당 이벤트를 전달받은 서비스가 장애가 났을 때 이것이 삭제된다면 말그대로 정보의 유실이 일어날것
-> 이를 극복할 수 있는것이 이벤트브로커이자 카프카
사용해보기 - https://github.com/skarltjr/msa-practice
프로젝트에 적용하기전 우선 개념을 알고 사용해보고자한다.
여기서는 단순하게 진행해보고자 특정 유저가 쓴 글이 잘 생성되면 article 서비스에서 user서비스로 이벤트를 전달하고자한다.
kafka는 도커를 활용
클러스터 관리를 위한 주키퍼가 항상 필요하기때문에 compose가 편하다고한다.
참고로 evn에 바로 topic을 추가할까하다 연습삼아 새로 추가하기로했다.
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
고민이 생긴 부분은 의존성 추가 부분이다.
처음에 단순하게 생각했을 땐 어차피 프로젝트에서 order,product,payment 모두 kafka를 사용할테니 전역적으로 의존성을 추가할까
생각했지만 더 큰 프로젝트에선 당연히 필요한곳에서만 사용하는것이 맞다고 판단해서 필요한곳에 의존성을 추가하기로했다
여기서는 이벤트를 발행할 article, 이벤트를 수신할 user에 의존성을 추가한다
위 개념설명에선 serializer를 stringSerializer를 사용했는데 나는 객체로 전달하고싶었다.
그래서 위 블로그를 참고하여 진행했다.
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
article , user에서 사용할것이기때문에 두 곳에서 정의
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class ArticleCreateMessage {
private Long articleId;
private Long writerId;
public ArticleCreateMessage(Long articleId, Long writerId) {
this.articleId = articleId;
this.writerId = writerId;
}
}
### 5. producer / consumer config
- producer config ( article aggregate )
@Configuration public class ArticleCreateProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootStrapServers;
public ProducerFactory<String, ArticleCreateMessage> producerFactory() {
Map<String,Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(configs);
}
public KafkaTemplate<String, ArticleCreateMessage> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- consumer config ( user aggregate )
@Configuration public class ArticleCreateConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootStrapServers;
public ConsumerFactory<String, ArticleCreateMessage> articleCreateMessageConsumer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "user");
return new DefaultKafkaConsumerFactory<>(
configs,
new StringDeserializer(),
new JsonDeserializer<>(ArticleCreateMessage.class));
}
public ConcurrentKafkaListenerContainerFactory<String, ArticleCreateMessage> articleCreateMessageListener() {
ConcurrentKafkaListenerContainerFactory<String, ArticleCreateMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(articleCreateMessageConsumer());
return factory;
}
}
### 6. producer / consumer 생성
- producer
@Component @RequiredArgsConstructor public class ArticleCreateProducer { private static final String TOPIC = "ArticleCreate"; private final KafkaTemplate<String, ArticleCreateMessage> kafkaTemplate;
public void sendMessage(Long articleId, Long writerId) {
ArticleCreateMessage message = new ArticleCreateMessage(articleId, writerId);
kafkaTemplate.send(TOPIC, message);
}
}
- consumer
@Component @RequiredArgsConstructor public class ArticleCreateConsumer { @KafkaListener(topics = "ArticleCreate", groupId = "user", containerFactory = "articleCreateMessageListener") public void consume(ArticleCreateMessage message) { System.out.println(message.getArticleId() + message.getWriterId() + " 메세지가 잘 도착했네요"); } }
### 7. 활용
@Component @RequiredArgsConstructor public class ArticleCommandHandler { private final ArticleRepository articleRepository; private final ArticleCreateProducer producer; public Article createArticle(CreateArticleCommand command) { Article article = Article.builder() .writerId(command.getWriterId()) .content(command.getContent()) .title(command.getTitle()) .build(); Article savedArticle = articleRepository.save(article); // 저장 후 producer.sendMessage(savedArticle.getId(),savedArticle.getWriterId()); // 이벤트 발행 return savedArticle; } }
이제 유저가 게시글을 생성하면 article 서비스에서는 유저를 저장 후 user 서비스에게 저장했다는것을 알리기위해 레코드를 발행한다 그리고 user의 컨슈머는 이를 꺼내서 출력해준다 결과를 확인해보자
### 8. 결과 확인
바로 위에서 정상적으로 생성했으면 aritlceId & userId가 잘 생성되었다는 메세지를 user서비스에서 받아낼 수 있다
- <img width="182" alt="스크린샷 2022-04-01 오전 1 40 29" src="https://user-images.githubusercontent.com/62214428/161106799-56082dc6-85e4-43b8-969c-987ea39dbd65.png">
고민거리
주문 -> 이벤트 발행 -> 재고 수정의 과정있을 때 분명히 재고 수정 실패가 발생하는 경우가 생길것 그럼 어떻게할까? 지금 생각해본건 실패 이벤트를 주문 서비스쪽으로 발행하여 해당 주문 id를 넘겨주고 이 id의 주문을 취소
그런데 이러면 유저입장에선 주문이 완료되었다고한걸 취소?? -> 고려해봐야한다.
kafka
참고 :
기본 구조
카프카 클러스터
주키퍼
프로듀서
컨슈머
즉, 카프카 클러스터. 카프카는 데이터 이동에 필요한 핵심역할을 수행한다.