컨슈머 : 프로듀서가 메시지를 생산해서 카프카의 토픽으로 메시지를 보내면 그 토픽의 메시지를 가져와서 소비(Consume)하는 역할을 하는 애플리케이션, 서버 등을 지칭
특정 파티션을 관리하고 있는 파티션 리더에게 메시지 가져오기 요청
메시지의 위치를 조정할 수 있고, 필요하다면 이미 가져온 데이터도 다시 가져올 수 있다.
5.1 컨슈머의 주요 옵션
컨슈머의 종류엔 '올드컨슈머'와 '뉴 컨슈머'가 있다. 차이는 주키퍼의 사용 유무. 구버전의 카프카는 컨슈머의 오프셋을 주키퍼의 지노드에 저장하는 방식을 이용, 카프카 0.9 버전 이후 오프셋 저장을 주키퍼가 아닌 토픽에 저장하는 방식으로 변경.
현재는 둘다 지원하지만, 향후 주키퍼의 지노드에 저장하는 방식은 사라질 예정
bootstrap.servers
카프카 클러스터에 처음 연결을 위한 호스트와 포트 정보로 구성된 리스트 정보.
포멧은 호스트명:포트, 호스트명:포트, 호스트명:포트 리스트가 아닌 하나의 호스트만 입력해서 사용도 가능하지만, 해당 호스트가 장애가 났을시 접속이 불가하기 때문에 리스트 전체를 입력하는 방식을 권장
fetch.min.bytes
한번에 가져올 수 잇는 최소 데이터 사이즈(해당 사이즈보다 작으면 응답하지 않고 데이터가 누적될때까지 기다림)
grop.id
컨슈머가 속한 그룹을 식별하는 식별자 (중요)
enable.auto.commit
백그라운드로 주기적으로 오프셋을 커밋
auto.offset.reset
초기 오프셋이 없거나 현재 오프셋이 더이상 존재하지 않는경우 설정한 옵셧으로 리셋됨
earliest: 가장 초기의 오프셋
latest:가장 마지막의 오프셋
none:이전 오프셋값을 찾지 못하면 에러
fetch.max.bytes
한번에 가져올수 있는 최대 사이즈
request.timeout.ms
요청에 대해 응답을 기다리는 최대 사이즈
session.timeout.ms
컨슈머와 브로커사이의 세션 타임 아웃 시간
이 시간내에 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않고 지나면 해당 컨슈머는 종료되거나 장애가 발생한것으로 판단하고 컨슈머 그룹은 리밸런스를 시도.(리밸런싱이란 컨슈머 A가 담당하던 파티션 작업을 컨슈머B에게 이관)
얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어하며, hertbeat.interval.ms와 연관이 있다.(일반적으로 같이 수정)
기본값 보다 낮으면 실패를 빨리감지하지만, GC나 poll 루프를 완료하는 시간이 길어지게되며, 원하지 않는 리밸런스가 일어난다.
높으면 원하지 않는 리밸런스가 일어날 가능성은 줄지만 실제 오류를 감지하는데 시간이 오래걸림
hertbeat.interval.ms
그룹 코디네이터에게 얼마나 자주 poll()메소드로 하트비트를 보낼것인지 조정
session.timeout.ms보다 낮아야 하며, 일반적으로 1/3 정도로 설정함(기본값은 3초)
max.poll.records
poll()에 대한 최대 레코드 수
max.poll.interval.ms
컨슈머가 하트비트만 보내고, 실제로 메세지를 가져가지 않는 경우도 존재.
이러한 경우 컨슈머가 무한정 해당 파티션을 점유할 수 없도록, 주기적으로 poll 하지 않으면 장애라고 판단하고, 컨슈머 그룹에서 제외
auto.commit.interval.ms
주기적으로 오프셋을 커밋하는 시간
fetch.max.wait.ms
fetch.min.bytes 에의해 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간
5.2 콘솔 컨슈머로 메시지 가져오기
kafka-console-consumer.sh 를 통해 카프카 토픽의 메시지를 가져올 수 있음
- @KafkaListener 어노테이션을 이용하였으며, [해당링크](https://victorydntmd.tistory.com/348)를 참조하였습니다.
책에서는 `KafkaConsumer`와 `ConsumerRecords` 를 이용하였는데, 이 방식이 더 깔끔하다고 생각하였습니다.
`partition`과 `offset` 등은 매개변수로 넣어주면 값을 얻을 수 있습니다.
- `topics`에 배열로 넣으면 여러개의 토픽을 처리할 수 있음
![스크린샷 2023-05-06 오후 6 38 28](https://user-images.githubusercontent.com/122066644/236622922-f68e2955-be23-4b7a-b7e0-06b3da702022.png)
## 5.4 파티션과 메세지 순서
- 기존에 사용하던 토픽과 다른 추가의 토픽을 하나 더 생성(파티션 수는 반드시 동일한 파티션 생성, 저는 Kafka UI 로 파티션 3개짜리 토픽 1개 생성)
``` sh
/usr/local/kafka/bin/kafka-topics.sh --zookeeper peter-zk001:2181,peter-zk002:2181,peter-zk003:2181/peter-kafka --topic peter-01 --partitions 3 --replication-factor 1 --create
파티션 1개일 경우 오프셋의 순서가 절대 역전되지 않기 때문에 컨슈머가 메세지를 가져갈때 프로듀서에서 생성한 메세지 순서와 동일
만일 메세지의 순서를 보장해야하는 경우 파티션의 숫자를 1로 설정. 단 분산처리를 할 수 없고, 하나의 컨슈머에서만 처리해야해서 처리량이 높지 않다.
5.5 컨슈머 그룹
컨슈머 그룹은 카프카 토픽에서 메세지를 읽어오는 역할
하나의 토픽에 여러 컨슈머 그룹이 동시에 접속 가능.
(기존의 다른 메시징큐 솔루션에서 컨슈머가 메세지를 가져가면 큐에서 삭제되어 다른 컨슈머가 가져갈 수 없었는데, 하나의 데이터에 다양한 용도로 사용하는 요구가 많아졌기 때문에 이 방법을 사용)
만약 프로듀서의 메세지 전송 속도가 컨슈머의 처리속도보다 빨라지게 된다면?
ex) 프로듀싱 속도 : 1 , 컨슈밍 속도 : 3일 경우 -> 컨슈머를 확장해야함
단순하게 컨슈머만 확장하면 기존 컨슈머의 오프셋 정보와 새로 추가된 컨슈머의 오프셋 정보가 섞여서 메세지들이 뒤죽박죽.
따라서 동일한 토픽에 대해 여러 컨슈머가 메세지를 가져갈 수 있도록 컨슈머 그룹이라는 기능을 제공.
(컨슈머 확장에 용이, 빠른 장애 대처 가능)
컨슈머 그룹 안에서 컨슈머들은 토픽의 파티션에 대한 소유권을 공유함.
5.5.1 컨슈머 그룹, 리밸런스
이 상황에서 컨슈머를 추가해야할 때, 그룹아이디를 컨슈머 그롭 01 로 설정하면
이런 설계가 된다. 기존 컨슈머01의 소유권이었던 파티션 1과 파티션 2가 각각 컨슈머02와컨슈머03 로 이동
이렇게 소유권이 이동한것을 리밸런스 라고 한다. 이걸통해 그룹 내에서 컨슈머를 쉽고 안전하게 추가, 제거할 수 있어 높은 가용성과 확장성을 확보한다.
리밸런스를 하는동안 컨슈머 그룹 전체가 메세지를 가져올수 없는 단점이 존재.
리밸런싱 동안토픽의 각 파티션마다 하나의 컨슈머가 연결되고, 리밸런스가 끝나면 컨슈머들은 각자 담당하는 파티션으로 부터 메세지를 얻음
5.5.2 컨슈머뿐만 아닌 파티션도 같이 확장
위 그림같은 경우 컨슈머의 숫자를 늘려봤자 컨슈머의 처리 속도가 늘어나지 않음
하나의 파티션은 하나의 컨슈머만 연결하기때문에 컨슈머 04는 노는 상황이 발생
이 이유는 하나의 파티션 내에서는 오프셋으로 순서가 보장되는데, 하나의 파티션에 두개의 컨슈머가 연결되면 오프셋의 순서가 보장이 안되기 때문(?)
이걸 해결하기 위해 컨슈머가 아닌 파티션의 숫자도 같이 늘려주어야 한다.
5.5.3 컨슈머 다운
컨슈머 그룹 내에서 컨슈머 하나가 다운되는 경우엔?
컨슈머가 그룹 안에서 맴버로 유지하고, 파티션을 소유하기 위해선 하트비트를 보내야함
-> 컨슈머가 일정한 주기로 하트비트를 보낸다는건 파티션의 메세지를 잘 처리하고 있다는것.
-> 하트비트는 컨슈머가 poll할때와 오프셋을 커밋할때 보냄
컨슈머가 하트비트를 보내지 않으면 세션은 타임아웃이 되고 해당 컨슈머가 다운되었다고 판단한뒤 리밸런스를 시작
5.5.4 하나의 토픽 여러개의 컨슈머 그룹
카프카와 다른 메세지 큐 솔루션의 차이점중 하나는 하나의 토픽에 여러 컨슈머 그룹이 붙을 수 있음
이렇게 가져갈 수 있는 이유는 컨슈머 그룹마다 각자의 오프셋을 별도로 관리하기때문
여러개의 컨슈머 그룹이 동시에 하나의 토픽의 메세지를 이용하는 경우 컨슈머 그룹 아이디는 서로 중복되지 않아야 함.
5.6 커밋과 오프셋
컨슈머가 poll()을 호출할때 마다 컨슈머 그룹은 카프카에 저장되어있는 아직 안읽은 메세지를 가져옴
컨슈머 그룹의 컨슈머들이 각각의 파티션에 대한 오프셋을 기록하고 있음.
각 파티션에 대한 현재 위치를 업데이트 하는 행위를 커밋이라고 함.
카프카는 파티션별로 오프셋 정보를 저장하기 위한 별도의 공간이 필요.
올드버전은 주키퍼에 저장했으며, 뉴 버전에는 카프카 내에 별도로 토픽을 만들고 그 토픽에 오프셋 정보들을 저장하고 있음
__consumer_offsets라고 부름
컨슈머 그룹에 리밸런스가 일어났을때 각각의 컨슈머는 이전 파티션이 아닌 새로운 파티션을 할당받는다.
이때 컨슈먼느 가장 최근에 커밋된 오프셋을 읽고, 메시지를 가져오기 시작함.
실제 처리한 마지막 오프셋 > 커밋되어서 기록되어있는 마지막 오프셋 : 메시지 중복
실제 처리한 마지막 오프셋 < 커밋되어서 기록되어있는 마지막 오프셋 : 메시지 누락
5.6.1 자동 커밋
오프셋을 직접 관리할수 있지만 자동 커밋 방식이 가장 쉽고, 많이 사용함.
enable.auto.commit=true로 설정하면 5초마다 컨슈머는 `poll()을 호출할때 가장 마지막 오프셋을 커밋함.
카프카 컨슈머
메시지의 위치를 조정할 수 있고, 필요하다면 이미 가져온 데이터도 다시 가져올 수 있다.
5.1 컨슈머의 주요 옵션
컨슈머의 종류엔 '올드컨슈머'와 '뉴 컨슈머'가 있다. 차이는 주키퍼의 사용 유무. 구버전의 카프카는 컨슈머의 오프셋을 주키퍼의 지노드에 저장하는 방식을 이용, 카프카 0.9 버전 이후 오프셋 저장을 주키퍼가 아닌
토픽
에 저장하는 방식으로 변경. 현재는 둘다 지원하지만, 향후 주키퍼의 지노드에 저장하는 방식은 사라질 예정bootstrap.servers
카프카 클러스터에 처음 연결을 위한 호스트와 포트 정보로 구성된 리스트 정보.
포멧은
호스트명:포트, 호스트명:포트, 호스트명:포트
리스트가 아닌 하나의 호스트만 입력해서 사용도 가능하지만, 해당 호스트가 장애가 났을시 접속이 불가하기 때문에 리스트 전체를 입력하는 방식을 권장fetch.min.bytes
한번에 가져올 수 잇는 최소 데이터 사이즈(해당 사이즈보다 작으면 응답하지 않고 데이터가 누적될때까지 기다림)
grop.id
컨슈머가 속한 그룹을 식별하는 식별자 (중요)
enable.auto.commit
백그라운드로 주기적으로 오프셋을 커밋
auto.offset.reset
초기 오프셋이 없거나 현재 오프셋이 더이상 존재하지 않는경우 설정한 옵셧으로 리셋됨
earliest: 가장 초기의 오프셋
latest:가장 마지막의 오프셋
none:이전 오프셋값을 찾지 못하면 에러
fetch.max.bytes
한번에 가져올수 있는 최대 사이즈
request.timeout.ms
요청에 대해 응답을 기다리는 최대 사이즈
session.timeout.ms
컨슈머와 브로커사이의 세션 타임 아웃 시간
이 시간내에 컨슈머가 그룹 코디네이터에게
하트비트
를 보내지 않고 지나면 해당 컨슈머는 종료되거나 장애가 발생한것으로 판단하고 컨슈머 그룹은리밸런스
를 시도.(리밸런싱이란 컨슈머 A가 담당하던 파티션 작업을 컨슈머B에게 이관)얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어하며,
hertbeat.interval.ms
와 연관이 있다.(일반적으로 같이 수정)기본값 보다 낮으면 실패를 빨리감지하지만, GC나 poll 루프를 완료하는 시간이 길어지게되며, 원하지 않는 리밸런스가 일어난다.
높으면 원하지 않는 리밸런스가 일어날 가능성은 줄지만 실제 오류를 감지하는데 시간이 오래걸림
hertbeat.interval.ms
그룹 코디네이터에게 얼마나 자주
poll()
메소드로 하트비트를 보낼것인지 조정session.timeout.ms
보다 낮아야 하며, 일반적으로 1/3 정도로 설정함(기본값은 3초)max.poll.records
poll()
에 대한 최대 레코드 수max.poll.interval.ms
컨슈머가 하트비트만 보내고, 실제로 메세지를 가져가지 않는 경우도 존재.
이러한 경우 컨슈머가 무한정 해당 파티션을 점유할 수 없도록, 주기적으로 poll 하지 않으면 장애라고 판단하고, 컨슈머 그룹에서 제외
auto.commit.interval.ms
주기적으로 오프셋을 커밋하는 시간
fetch.max.wait.ms
fetch.min.bytes
에의해 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간5.2 콘솔 컨슈머로 메시지 가져오기
kafka-console-consumer.sh
를 통해 카프카 토픽의 메시지를 가져올 수 있음컨슈머 그룹이 필요한데,
kafka-console-consumer.sh
실행하면서 그룹을 지정해 주어야 함.만약 지정을 안해주면
console-consumer-xxxxx(숫자형태)
로 컨슈머 그룹이 생성됨컨슈머 그룹을 볼 수있는 명령어 역시
/usr/local/kafka/bin/kafka-consumer-groups.sh
에서 제공됨컨슈머 그룹 이름 지정하는 명령어
결과는 똑같고, 그룹 리스트를 다시 보면 새로 생긴걸 볼 수 있다.
5.3 자바를 이용한 컨슈머
import java.io.IOException;
import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header;
@Configuration public class KafkaConsumer {
}
5.4.1 파티션 3개로 구성한 peter-01토픽과 메세지 순서
다시 확인 오히려 알파벳보다 먼저 나오는 경우도 있는데, 토픽은 파티션이 3개, 각 파티션별로 메세지가 어떻게 저장되어있는지 확인하면
이건 메세지들이 하나의 파티션에만 순서대로 저장되는 것이 아니라, 각각의 파티션에 메세지가 저장되어서 나타나는 현상.
컨슈머는 `peter-01토픽에서 메세지를 가져올때 프로듀서가 어떤 순서로 메세지를 보냈는지 알 수 없음. (오직 파티션의 오프셋 기준으로만)
책 기준으로 가져온 순서를 보면 a,d,b,e,c 순서로 가져왔음.
출력 내용은 파티션의 오프셋 순서대로 메세지를 가져왔음. 오프셋은 절대 역전되지 않았음
카프카에서 토픽의 파티션이 여러개인 경우 메세지의 순서를 보장할 수 없다.
파티션 내에서의 메세지 순서는 프로듀서가 생성한 수서와 동일하지만, 파티션과 파티션 사이의 순서는 보장 X
5.4.2 파티션 1개로 구성된 peter-02토픽과 메세지 순서
파티션1개짜리 토픽 생성
a~e 까지 메세지 전송
메세지 정상 순서 확인
만일 메세지의 순서를 보장해야하는 경우 파티션의 숫자를 1로 설정. 단 분산처리를 할 수 없고, 하나의 컨슈머에서만 처리해야해서 처리량이 높지 않다.
5.5 컨슈머 그룹
5.5.1 컨슈머 그룹, 리밸런스
이 상황에서 컨슈머를 추가해야할 때, 그룹아이디를
컨슈머 그롭 01
로 설정하면이런 설계가 된다. 기존
컨슈머01
의 소유권이었던파티션 1
과파티션 2
가 각각컨슈머02
와컨슈머03
로 이동이렇게 소유권이 이동한것을
리밸런스
라고 한다. 이걸통해 그룹 내에서 컨슈머를 쉽고 안전하게 추가, 제거할 수 있어 높은 가용성과 확장성을 확보한다.리밸런스
를 하는동안 컨슈머 그룹 전체가 메세지를 가져올수 없는 단점이 존재.리밸런싱 동안토픽의 각 파티션마다 하나의 컨슈머가 연결되고, 리밸런스가 끝나면 컨슈머들은 각자 담당하는 파티션으로 부터 메세지를 얻음
5.5.2 컨슈머뿐만 아닌 파티션도 같이 확장
위 그림같은 경우 컨슈머의 숫자를 늘려봤자 컨슈머의 처리 속도가 늘어나지 않음
하나의 파티션은 하나의 컨슈머만 연결하기때문에
컨슈머 04
는 노는 상황이 발생이 이유는 하나의 파티션 내에서는 오프셋으로 순서가 보장되는데, 하나의 파티션에 두개의 컨슈머가 연결되면 오프셋의 순서가 보장이 안되기 때문(?)
이걸 해결하기 위해 컨슈머가 아닌 파티션의 숫자도 같이 늘려주어야 한다.
5.5.3 컨슈머 다운
5.5.4 하나의 토픽 여러개의 컨슈머 그룹
오프셋
을 별도로 관리하기때문컨슈머 그룹 아이디
는 서로 중복되지 않아야 함.5.6 커밋과 오프셋
poll()
을 호출할때 마다 컨슈머 그룹은 카프카에 저장되어있는 아직 안읽은 메세지를 가져옴컨슈머 그룹
의 컨슈머들이 각각의 파티션에 대한오프셋
을 기록하고 있음.커밋
이라고 함.올드버전
은주키퍼
에 저장했으며,뉴 버전
에는 카프카 내에 별도로 토픽을 만들고 그 토픽에 오프셋 정보들을 저장하고 있음__consumer_offsets
라고 부름실제 처리한 마지막 오프셋
>커밋되어서 기록되어있는 마지막 오프셋
: 메시지 중복실제 처리한 마지막 오프셋
<커밋되어서 기록되어있는 마지막 오프셋
: 메시지 누락5.6.1 자동 커밋
enable.auto.commit=true
로 설정하면 5초마다 컨슈머는 `poll()을 호출할때 가장 마지막 오프셋을 커밋함.auto.commit.interval.ms
옵션을 통해 조정이 가능하고, 컨슈머는 poll 할때마다 커밋할 시간인지 아닌지 체크위와같이 5초에 한번씩 커밋하고, 한번
poll()
할때마다 2개의 메시지를 가져옴.그런데 위 그림처럼 커밋은 4번까지 했고,
컨슈머 01
이 5,6번 메세지를 가져왔는데 3초가 지난뒤 리밸런스가 일어남파티션 0
을 새로 할당받은컨슈머 02
는컨슈머 01
이 가져갔던 5,6번 메세지를 다시 가져가게됨. 이걸 메세지 중복이라고 말함중복을 줄이기 위해서는 자동 커밋의 시간을 더 줄이면 되지만 완벽하게 제거하는것은 불가능
5.6.2 수동 커밋
메세지 처리가 완료되기 전에 메시지를 가져간것은 메세지를 가져간게 아니다.
라고 판단해야할때 사용함자동커밋
으로poll()
하면서 마지막 값의 오프셋으로 자동 커밋이 되고, 일부 메세지들은데이터베이스에 저장하지 못한 상태로 컨슈머 장애가 발생하면 메세지들은 데이터베이스에 저장하지 못하고 손실되어버림.5.6.3 특정 파티션 할당
컨슈머
는토픽
을 구독하고,카프카
가컨슈머 그룹
에게파티션
을 분배했지만 특정 파티션을 제어할 수 있음키-값
형태의 파티션에 저장되어있어야 하고, 특정 파티션에 대한 메세지들만 가져와야함5.6.4 특정 오프셋부터 메세지 가져오기
seek()
을 이용하면 됨