hlab-books / kafka-the-definitive-guide

Kafka: The Definitive Guide ( 2nd Edition )
0 stars 3 forks source link

Chapter 4. 카프카 컨슈머: 카프카에서 데이터 읽기 #6

Open hubtwork opened 3 weeks ago

hubtwork commented 3 weeks ago

Chapter Ownership : @KangHun-Lee

KangHun-Lee commented 2 weeks ago

컨슈머 그룹과 파티션 리밸런스

컨슈머 그룹의 개념

파티션 할당 방식

컨슈머가 파티션의 갯수보다 많을 경우 유휴 컨슈머가 생기므로 주의 필요

파티션 리밸런스

리밸런스 개념

조급한 리밸런스

협력적 리밸런스


정적 그룹 멤버십

정적 그룹 멤버십의 필요성

리밸런스 이후의 처리


KafkaConsumer 생성 및 토픽 구독

KafkaConsumer 인스턴스 생성

예시 코드

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

토픽 구독

예시 코드

consumer.subscribe(Collections.singletonList("customerCountries"));

폴링 루프

폴링 루프 소개

데이터 처리 과정

예시 코드

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    consumer.commitAsync();
}

스레드 안정성


컨슈머 설정 옵션

중요 설정 옵션

1. auto.offset.reset

2. enable.auto.commit

3. fetch.min.bytesfetch.max.wait.ms

4. max.poll.records

5. session.timeout.msheartbeat.interval.ms


fetch.min.bytes

fetch.max.wait.ms

fetch.max.bytes

max.poll.records

max.partition.fetch.bytes

session.timeout.ms 그리고 heartbeat.interval.ms

max.poll.interval.ms

default.api.timeout.ms

request.timeout.ms

auto.offset.reset

enable.auto.commit

partition.assignment.strategy

client.id

client.rack

group.instance.id

receive.buffer.bytes, send.buffer.bytes

offsets.retention.minutes


오프셋과 커밋

자동 커밋

현재 오프셋 커밋하기

비동기적 커밋

동기적 커밋과 비동기적 커밋을 함께 사용하기

특정 오프셋 커밋하기


리밸런스 리스너


특정 오프셋의 레코드 읽어오기


폴링 루프를 벗어나는 방법


디시리얼라이저

커스텀 디시리얼라이저

Avro 디시리얼라이저 사용하기


독립 실행 컨슈머

CHOICORE commented 2 weeks ago

토픽으로부터 데이터를 읽어 오는 작업을 확장 할 수 있어야한다.

리밸런스 (또밸런싱)

하나의 스레드당 하나의 컨슈머

소감

고가용성을 위해 컨슈머 그룹, 파티셔닝, 리밸런싱, 오프셋 커밋 등 다양한 요소들에 대해서 알게되서 좋았다. 뭐 카프카의 생태계를 알게 되는 점은 흥미롭고 재밌다. 그러나, 결국 설정 싸움인건가? 매직넘버를 내가 어떻게 찾냐... 라는 생각이 스쳐 지나갔다. 뭐 카프카 구축은 인프라 형님들께 맡기고, 스프링 카프카는 그냥 쓰면 되겠지? (스프링 카프카도 설정 싸움이 있을텐데) 그래서 스프링 카프카를 사용할 때 주의해야할 설정들이 궁금하다.

zinokim commented 2 weeks ago

소감

p.92 카프카 클러스터에 파티션이 매우 많다면(3만 개 이상이라든지) ... 생략

3만 개 이상의 파티션이 필요한 경우는 어떤 경우인지 전혀 짐작이 가지 않는다.

현재 회사에서는 JsonDeserializer를 사용하고 있고 특별히 SerializerDeserializer에 대해서 고민하지 않았던 것 같다. StringDeserializer, AvroDeserializer, JsonDeserializer 무엇을 쓰든 잘 알고 써야겠다.

오프셋 커밋이 상당히 중요한 개념이라고 읽혔고, 그만큼 전략적으로 신경을 많이 쓴 것 같았다. 오프셋 커밋 전략을 보통의 회사에서는 어떤 식으로 가져가는지 궁금하다. 지금 회사에서 사용하고 있는 카프카 설정이 많지 않지만 어떤 설정인지 잘 모르고 있었는데 다시 한 번 살펴볼 계획이다.

프로듀서든 컨슈머든 개념 용어에 대한 내용들은 이해하겠는데 구체적인 설정값들과 상황에 대해서 이해가 바로바로 되지는 않는다. (카프카 경험이 없어서인가 이해력이 부족한 것인가...)

위에 언급한 3만 개 이상의 파티션도 그렇고 카프카가 많은 설정을 통해 사용할 수 있도록 했는데, 저 많은 설정들을 직접 구성하고 사용하는 경우는 어떤 경우가 있을지 궁금하다.

hubtwork commented 2 weeks ago

소감

컨슈머의 리밸런싱 및 관련한 설정은 실제로 카프카를 어떻게 활용하고자 하는 것에 엄청난 영향을 미친다. 예를 들어, 단순히 EDA 를 위해 카프카를 활용하는 것은 어렵지 않을 수 있다. 특정 토픽에 대해서 메세지 발행량과 토픽의 파티션 별 lag 에 대해서 고려하고, 이를 잘 소비할 수 있도록 파티션 개수 및 리밸런싱이 덜 일어나기 위한 작업 속도 등을 고려해 적절한 설정값만 정해주면 된다. ( 이건 스프링 쓰고 있으면 spring-data-kafka 가 아주 손쉽게 Configuration 으로 지정할 수 있도록 제공하기 때문에 너무나도 쉽다. ) 그런데 이런 상황이 생길 수 있다. 예를 들어, 동일한 데이터에 대한 이벤트의 후속처리가 순차적으로 이루어져야하기 때문에 특정 데이터를 메세지의 키로 지정해 각 파티션 내에서 순차보장을 하는 Skill 을 우리가 사용하게 되는데, 여기서 파티션 갯수나 메세지 발행량 등에 의해 컨슈머 성능이 일부 파티션에 대해서 떨어져 엄청난 lag 이 발생하게 되는 현상을 볼 수도 있다.

e.g. 아래는 일부 파티션의 경우 lag 이 없으나 특정 파티션에서 메세지가 burst 되어 파티션 lag 이 기하급수적으로 쌓인 케이스. 실제로 현재 파티션별 offset Gap 이 적으면 3만개~ 많으면 6만개 정도 차이난다.

Screenshot 2024-09-01 at 7 35 53 PM

이런 경우엔, 파티션의 개수를 늘리거나 처리량을 늘리는 등 다양한 고민을 하게 되는데 이 경우 파티션의 개수를 늘리면 줄일 수 없다는 카프카의 절대법칙 때문에 함부로 늘릴수도 없어서 실력이 여기서 드러난다. 파티션을 늘린다고 해서 Lag 이 줄어들까? ( 파티션 hash 가 늘어나 전체적인 lag 은 줄어들 수 있지만 특정 lag 을 유발하는 주범이 또 같은 파티션에 몰려 결국 다시 lag 이 생길 수도 있다. )

실제로 10년차 넘는 개발자 분이 "그냥 파티션 늘리면 안되요?" 하는 걸 이전 직장에서 들었는데 얼처구니가..

또한 유량제어를 위해 카프카를 중도에 두어 단계적으로 발행을 압축하는 방식을 활용한다던지, 하기 위해 아래 설정 값들에 대해 이해하고 적절하게 조절하는 것이 애플리케이션 개발자의 실력을 드러내게 된다.

e.g. 아래는 특정 시스템적 설계를 해소하기 위해 제어한 Consumer Property

Screenshot 2024-09-01 at 7 43 48 PM

이에 이런 부분들을 고려해서 잘 공부하고 활용하였으면 좋겠다.

iamzin commented 2 weeks ago

AWS 만큼 설정의 숙련도가 필요해 보이지만, 서버를 EC2로 수동 배포 해보듯이 컨슈머 그룹 없이 단순하게도 구현해볼 수 있을 것 같다. SQS로 구성한 로직이 있다면 가장 단순한 형태의 프로듀서와 컨슈머를 구성해보고 싶다.

또한 결국 사용하게 될 Spring Kafka에 대해서도 접목해서 이해해보는 연습을 해야겠다.


컨슈머 그룹의 기능이 필요한게 아니며 리밸런싱이 필요하지 않다면 굳이 컨슈머 그룹을 사용하지 않아도 된다.

책에서 언급한 해당 경우는 아래와 같다.

또한 컨슈머 그룹이 없어도 필요한 경우 오프셋을 커밋할 수 있다. 이 경우 컨슈머가 어떤 파티션을 읽어야 하는지 명확히 알 경우 subscribe 할 필요 없이 파티션을 스스로 할당 받을 수 있다.

(아직 예제로 코드 구현은 진행 중이라, 완성되면 추가 첨부해보겠습니당)

kimsunhak commented 2 weeks ago

소감

설정에 대한 내용들이 조금 인상 깊었다. 과거 실무에서 offset을 읽을때 earliest, latest 두 옵션을 사용했다가, 수동 커밋으로 작업했었다.

옵션 설정에서 아래에 해당하는 설정은 좀 더 중요하게 잡았으면 좋겠다고 생각하고, fetch.max.bytes = 한번에 가져 올 수 있는 최소 크기 fetch.max.wait.ms = 설정 값보다 적은 경우 요청에 대한 응답을 기다리는 최대 시간 max.partition.fetch.bytes = 파티션당 가져올 수 잇는 최대 크기 max.poll.records poll = 최대 Record 수

Consumer는 얼마 만큼의 Record의 양을 가져와야 하는지와 만일 Consumer가 한번에 읽어들이는 양이 적다면 Fetch(Messages Read)가 반복적으로 일어나게 되면서 성능 저하가 발생하는 이슈가 있으니 다들 이 부분 참고하시길..

KangHun-Lee commented 2 weeks ago

카프카 프로듀서 성능 테스트

./kafka-producer-perf-test.sh --topic 토픽명 --throughput -1 --num-records 1000000 --record-size 600 --producer-props bootstrap.servers=첫번째:9092,2번째:9092,3번째:9092

image