kmg28801 / kafka-study

1 stars 0 forks source link

[실전 카프카 개발부터 운영까지] 3장. 카프카 기본 개념과 구조 #13

Open kmg28801 opened 1 year ago

kmg28801 commented 1 year ago

카프카를 구성하는 주요 요소

리플리케이션

파티션

세그먼트

카프카 핵심 개념

분산 시스템

페이지 캐시

배치 전송 처리

압축 전송

토픽, 파티션, 오프셋

고가용성 보장

주키퍼의 의존성

프로듀서 기본 동작과 예제 맛보가ㅣ

프로듀서 디자인

image

프로듀서의 주요 옵션

옵션 설명
bootstrap.server 클라이언트가 카프카 클러스터에 처음 연결하기 위한 호스트&포트 정보
client.dns.lookup 하나의 호스트에 여러 IP를 매핑해 사용하는 일부 환경에서 클라이언트가 하나의 IP와 연결하지 못할 경우에 다른 IP로 시도하는 설정
acks 프로듀서가 카프카 토픽의 리더 측에 메시지를 전송한 후 요청을 완료하기를 결정하는 옵션.0,1,all(-1)로 표현하며, 0은 리더 측에서 메시지를 받았는지 확인하지 않는다. 따라서 전송이 빠르지만, 일부 메시지가 유실될 가능성이 있다. 1은 리더가 메시지를 받았는지 확인하지만, 모든 팔로워 전부를 확인하진 않는다. 대부분 기본값으로 1을 사용한다. all은 팔로워가 메시지를 받았는지 여부를 확인한다.
buffer.memory 카프카 서버로 데이터를 보내기 위해 잠시 대기(배치 전송이나 딜레이 등)할 수 있는 전체 메모리 바이트
compression.type 프로듀서가 메시지 전송 시 선택할 수 있는 압축 타입. (none, gzip, snappy, lz4, zstd)
enable.idempotence 설정을 true로 하면 중복 없는 전송이 가능함(중복 없는 전송은 max.flight.requests.per.connection은 5이하, retries는 0이상, acks는 all로 설정해야함)
max.in.flight.request.per.connection 하나의 커넥션에서 프로듀서가 최대한 ACK 없이 전송할 수 있는 요청 수. 메시지 순서가 중요하다면 1로 설정할 것을 권장하지만, 성능은 다소 떨어진다.
retries 전송에 실패한 데이터를 다시 보내는 횟수
batch.size 배치 전송할 배치 크기
linger.ms 배치 크기에 도달하지 못한 상황에서 linger.ms 제한 시간에 도달했을 때 메시지를 전송한다
transactional.id '정확히 한번 전송'을 위해 사용하는 옵션이며, 동일한 TransactionalId에 한해 정확히 한 번을 보장한다. 옵션을 사용하기 전 enable.idempotence를 true로 설정해야 함
public class ProducerSync {
    public static void main(String[] args) {
        Properties props = new Properties(); //Properties 오브젝트를 시작합니다.
        props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092"); //브로커 리스트를 정의합니다.
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer"); //메시지 키와 벨류에 문자열을 지정하므로 내장된 StringSerializer를 지정합니다.
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props); //Properties 오브젝트를 전달해 새 프로듀서를 생성합니다.

        try {
            for (int i = 0; i < 3; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("peter-basic01", "Apache Kafka is a distributed streaming platform - " + i); //ProducerRecord 오브젝트를 생성합니다.
                RecordMetadata metadata = producer.send(record).get(); //get() 메소드를 이용해 카프카의 응답을 기다립니다. 메시지가 성공적으로 전송되지 않으면 예외가 발생하고, 에러가 없다면 RecordMetadata를 얻게 됩니다.
                System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Received Message: %s\n", metadata.topic(), metadata.partition()
                        , metadata.offset(), record.key(), record.value());
            }
        } catch (Exception e){
            e.printStackTrace(); //카프카로 메시지를 보내기 전과 보내는 동안 에러가 발생하면 예외가 발생합니다.
        } finally {
            producer.close(); // 프로듀서 종료
        }
    }
}
public class ProducerAsync {
    public static void main(String[] args) {
        Properties props = new Properties(); //Properties 오브젝트를 시작합니다.
        props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092"); //브로커 리스트를 정의합니다.
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer"); //메시지 키와 벨류에 문자열을 지정하므로 내장된 StringSerializer를 지정합니다.
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props); //Properties 오브젝트를 전달해 새 프로듀서를 생성합니다.

        try {
            for (int i = 0; i < 3; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("peter-basic01", "Apache Kafka is a distributed streaming platform - " + i); //ProducerRecord 오브젝트를 생성합니다.
                producer.send(record, new PeterProducerCallback(record)); //프로듀서에서 레코드를 보낼 때 콜백 오브젝트를 같이 보냅니다.
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            producer.close(); // 프로듀서 종료
        }
    }
}

컨슈머의 기본 동작과 예제 맛보기

컨슈머의 주요 옵션

컨슈머 옵션 설명
bootstrap.servers 브로커의 정보
fetch.min.bytes 한 번에 가져올 수 있는 최소 데이터 크기. 만약 지정한 크기보다 작은 경우, 요청에 응답하지 않고 데이터가 누적될 때까지 기다린다.
group.id 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자. 동일한 그룹 내의 컨슈머 정보는 모두 공유된다.
heartbeat.interval.ms 하트비트가 있다는 것은 컨슈머의 상태가 active임을 의미. 일반적으로 session.timeout.ms의 1/3로 설정한다.
max.partition.fetch.bytes 파티션당 가져올 수 있는 최대 크기
session.timeout.ms 이 시간을 이용해, 컨슈머가 종료된 것인지를 파악한다. 컨슈머는 주기적으로 하트비트를 보내야 하고, 만약 이 시간 전까지 하트비트를 보내지 않았다면 해당 컨슈머는 종료된 것으로 간주하고 컨슈머 그룹에서 제외하고 리밸런싱을 시작한다.
enable.auto.commit 백그라운드로 주기적으로 오프셋을 커밋한다.
auto.offset.reset 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않는 경우에 다음 옵션으로 reset한다.- earliest: 가장 초기의 오프셋값으로 설정- latest: 가장 마지막의 오프셋값으로 설정.- none: 이전 오프셋값을 찾지 못하면 에러 발생
fetch.max.bytes 한 번의 가져오기 요청으로 가져올 수 있는 최대 크기
group.instance.id 컨슈머의 고유한 식별자. 만약 설정한다면 static 멤버로 간주되어, 불필요한 리밸런싱을 하지 않는다
isolation.level 트랜잭션 컨슈머에서 사용되는 옵션으로, read_uncommitted는 기본값으로 모든 메시지를 읽고, read_committed는 트랜잭션이 완료된 메시지만 읽는다.
max.poll.records 한 번의 poll() 요청으로 가져오는 최대 메시지의 수
partition.assignment.strategy 파티션 할당 전략이며, 기본값은 range
fetch.max.wait.ms fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 대한 응답을 기다리는 최대 시간


컨슈머 예제

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerAuto {
    public static void main(String[] args) {
        Properties props = new Properties(); //Properties 오브젝트를 시작합니다.
        props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092"); //브로커 리스트를 정의합니다.
        props.put("group.id", "peter-consumer01"); //컨슈머 그룹 아이디 정의합니다.
        props.put("enable.auto.commit", "true"); //자동 커밋을 사용합니다.
        props.put("auto.offset.reset", "latest"); //컨슈머 오프셋을 찾지 못하는 경우 latest로 초기화 합니다. 가장 최근부터 메시지를 가져오게 됩니다.
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //문자열을 사용했으므로 StringDeserializer 지정합니다.
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //Properties 오브젝트를 전달하여 새 컨슈머를 생성합니다.
        consumer.subscribe(Arrays.asList("peter-basic01")); //구독할 토픽을 지정합니다.

        try {
            while (true) { //무한 루프 시작입니다. 메시지를 가져오기 위해 카프카에 지속적으로 poll()을 하게 됩니다.
                ConsumerRecords<String, String> records = consumer.poll(1000); //컨슈머는 폴링하는 것을 계속 유지하며, 타임 아웃 주기를 설정합니다.해당 시간만큼 블럭합니다.
                for (ConsumerRecord<String, String> record : records) { //poll()은 레코드 전체를 리턴하고, 하나의 메시지만 가져오는 것이 아니므로, 반복문 처리합니다.
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            consumer.close(); //컨슈머를 종료합니다.
        }
    }
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerSync {
    public static void main(String[] args) {
        Properties props = new Properties(); //Properties 오브젝트를 시작합니다.
        props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092"); //브로커 리스트를 정의합니다.
        props.put("group.id", "peter-consumer01"); //컨슈머 그룹 아이디 정의합니다.
        props.put("enable.auto.commit", "false"); //자동 커밋을 사용하지 않습니다.
        props.put("auto.offset.reset", "latest"); //컨슈머 오프셋을 찾지 못하는 경우 latest로 초기화 합니다. 가장 최근부터 메시지를 가져오게 됩니다.
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //문자열을 사용했으므로 StringDeserializer 지정합니다.
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //Properties 오브젝트를 전달하여 새 컨슈머를 생성합니다.
        consumer.subscribe(Arrays.asList("peter-basic01")); //구독할 토픽을 지정합니다.

        try {
            while (true) { //무한 루프 시작입니다. 메시지를 가져오기 위해 카프카에 지속적으로 poll()을 하게 됩니다.
                ConsumerRecords<String, String> records = consumer.poll(1000); //컨슈머는 폴링하는 것을 계속 유지하며, 타임 아웃 주기를 설정합니다.해당 시간만큼 블럭합니다.
                for (ConsumerRecord<String, String> record : records) { //poll()은 레코드 전체를 리턴하고, 하나의 메시지만 가져오는 것이 아니므로, 반복문 처리합니다.
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                consumer.commitSync(); //현재 배치를 통해 읽은 모든 메시지들을 처리한 후, 추가 메시지를 폴링하기 전 현재의 오프셋을 커밋합니다.
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            consumer.close(); //컨슈머를 종료합니다.
        }
    }
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerAsync {
    public static void main(String[] args) {
        Properties props = new Properties(); //Properties 오브젝트를 시작합니다.
        props.put("bootstrap.servers", "peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092"); //브로커 리스트를 정의합니다.
        props.put("group.id", "peter-consumer01"); //컨슈머 그룹 아이디 정의합니다.
        props.put("enable.auto.commit", "false"); //자동 커밋을 사용하지 않습니다.
        props.put("auto.offset.reset", "latest"); //컨슈머 오프셋을 찾지 못하는 경우 latest로 초기화 합니다. 가장 최근부터 메시지를 가져오게 됩니다.
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //문자열을 사용했으므로 StringDeserializer 지정합니다.
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //Properties 오브젝트를 전달하여 새 컨슈머를 생성합니다.
        consumer.subscribe(Arrays.asList("peter-basic01")); //구독할 토픽을 지정합니다.

        try {
            while (true) { //무한 루프 시작입니다. 메시지를 가져오기 위해 카프카에 지속적으로 poll()을 하게 됩니다.
                ConsumerRecords<String, String> records = consumer.poll(1000); //컨슈머는 폴링하는 것을 계속 유지하며, 타임 아웃 주기를 설정합니다.해당 시간만큼 블럭합니다.
                for (ConsumerRecord<String, String> record : records) { //poll()은 레코드 전체를 리턴하고, 하나의 메시지만 가져오는 것이 아니므로, 반복문 처리합니다.
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                consumer.commitAsync(); //현재 배치를 통해 읽은 모든 메시지들을 처리한 후, 추가 메시지를 폴링하기 전 현재의 오프셋을 비동기 커밋합니다.
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            consumer.close(); //컨슈머를 종료합니다.
        }
    }
}

컨슈머 그룹의 이해

kmg28801 commented 1 year ago

비동기 커밋 vs 동기 커밋