Open hubtwork opened 2 months ago
Producer
관련 옵션에 대해서는 주로 잘 신경을 안쓰고 보통 Consumer
측면에서 유량 제어를 위한 것에 대한 고려만 많이 했었다.
그런데, 좀 보다 보니 Producer 관련 Ack 설정이나, TIME 설정을 통해 카프카의 fallback / circuit 구성을 하면 좀 더 가용성을 높일 수 있을 것 같도라.... 이거 옵션 차주에 CHEATSHEET 만들어서 뿌릴 예정 ( SPRING DATA KAFKA )
애플리케이션이 카프카 메시지를 써야 하는 상황
사용 사례에서의 요구 조건
bootstrap.servers
host:port
목록key.serializer
org.apache.kafka.common.serialization.Serializer
인터페이스를 구현하는 클래스의 이름 지정ByteArraySerializer
, StringSerializer
, IntegerSerializer
, VoidSerializer (키값없이 밸류값만)
등 자주 사용되는 타입 포함value.serializer
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer<String,String> producer = new KafkaProducer<String, String>(kakaProps);
파이어 앤 포겟 (Fire and Forget)
동기적 전송 (Synchronous Send)
비동기적 전송 (Asynchronous Send)
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Precision Products", "France"); // step 1
try {
producer.send(record); // step 2
} catch (Exception e){
e.printStackTrace(); // step 3
}
동기적 메시지 전송에서 주요한 균형점은 성능 (performance)이며
카프카 클러스터에 얼마나 작업이 몰리느냐에 따라 브로커는 쓰기 요청에 응답하기까지 최소 2ms에서 최대 몇 초까지 지연될 수 있음
코드 예제에서는 매우 흔한 반면 성능 문제로 실제 애플리케이션에서는 잘 사용되지 않음
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Precision Products", "France");
try {
producer.send(record).get(); // step 1
} catch (Exception e){
e.printStackTrace(); step 2
}
메시지를 비동기적으로 전송하고도 여전히 에러를 처리하는 경우를 위해 콜백 지정
private class DemoProducerCallback implements Callback { // stpe 1
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace(); // step 2
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Biomedical Materials", "USA"); // step 3
producer.send(record, new DemoProducerCallback()); // step 4
org.apache.kafka.clients.producer.Callback
인터페이스 구현 클래스 필요 (onCompletion() 단 하나의 메서드)프로듀서와 그것을 사용하는 애플리케이션을 구분하기 위한 논리적 식별자로 임의의 문자열 사용 ex)
프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지 결정
acks=0
acks=1
acks=all (3.0 버전부터 기본값)
종단 지연(end to end latency)의 경우 세 값 모두 동일
카프카 2.1에서 도입된 최신 버전의 구현을 기준으로 ProducerRecord를 보낼 때 걸리는 시간을 두 구간으로 나누어 따로 처리
프로듀서의 전송 버퍼가 가득 차거나 메타데이터가 아직 사용 가능하지 않을 때 블록되며 설정된 값만큼 시간이 흐르면 예외 발생
레코드 전송 준비가 완료된 시점(send()가 문제없이 리턴되고 레코드가 배치에 저장된 시점)에서부터 브로커의 응답을 받거나 아니면 전송을 포기하게 되는 시점까지의 제한시간
프로듀서가 데이터를 전송할 때 서버로부터 응답을 받기 위해 얼마나 기다릴 것인지 결정하는 값으로 각각의 쓰기 요청 후 전송을 포기하기까지 대기하는 시간 (재시도 시간이나 실제 전송 이전에 소요되는 시간 미포함)
retries 매개변수는 프로듀서가 메시지 전송을 포기하고 에러를 발생시킬 때까지 메시지를 재전송하는 횟수를 결정하는 값이며 retry.backoff.ms (기본값 100ms) 매개변수는 각각의 재시도 사이에 대기하는 시간을 결정
우리는 현재 버전의 카프카에서 이 값들을 조정하는 것을 권장하지 않는다. 대신, 크래시 난 브로커가 정상으로 돌아오기까지(즉, 모든 파티션에 대해 새 리더가 선출되는 데 얼마나 시간이 걸리는지)의 시간을 테스트한 뒤 delivery.timeout.ms 매개변수를 잡아 주는 것을 권장한다. 재전송을 시도하는 전체 시간이 카프카 클러스터가 크래시로부터 복구되기까지의 시간보다 더 길게 잡히도록 잡아 주는 것이다.
현재 배치를 전송하기 전까지 대기하는 시간을 결정하는 값
프로듀서가 메시지를 전송하기 전에 메시지를 대기시키는 버퍼의 크기(메모리의 양)를 결정하는 값 대부분의 프로듀서 예외와는 달리 이 타임아웃은 send() 메서드에서 발생하며 send() 메서드가 리턴하는 Future 객체에서 발생하지 않음
기본적으로 메시지는 압축되지 않은 상태로 전송되지만 압축이 필요한 경우 압축 알고리즘을 결정하는 값
배치에 사용될 메모리의 양을 결정하는 값 ('개수'가 아니라 '바이트' 단위)
프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지의 수를 결정하는 값
실험에 따르면 단일 데이터센터에 카프카를 설정할 경우 이 값이 2일 때 처리량이 최대를 기록하지만, 기본값인 5를 사용할 때도 비슷한 성능을 보여준다는 점이 알려져있다.
메시지의 순서를 보장과 성능상의 고려 때문에 in-flight 요청이 최소 2 이상은 되어야 한다는 점 그리고 신뢰성을 보장하기 위해서 재시도 횟수 또한 높아야 한다는 점을 감안하면,
가장 합당한 선택은 enable.idempotence=true
로 설정하는 것
프로듀서가 전송하는 쓰기 요청의 크기를 결정하는 값으로 기본값은 1MB
데이터를 읽거나 쓸 때 소켓(socket)이 사용하는 TCP 송수신 버퍼의 크기를 결정하는 값으로 각각의 값이 -1일 경우에는 운영체제의 기본값 사용 프로듀서 서로 다른 데이터센터에 위치한 브로커와 통신할 경우 이 값을 올려잡아 주는 것을 권장
0.11부터 지원하기 시작한 '정확히 한 번' 의미구조(exactly once' semantics)
멱등성 프로듀서 기능
카프카로 전송해야 하는 객체가 단순한 문자열이나 정숫값이 아닐 경우의 두 가지 선택
Avro
, Thrift
, Protobuf
라이브러리 사용언어 중립(language-neutral)적인 데이터 직렬화 형식으로 더 범용적인 데이터 파일 공유 방식을 제공하는 것을 목표로 더그 커팅(Doug Cutting)에 의해 시작
데이터를 읽는 쪽 애플리케이션을 전부 변경하지 않고 스키마를 변경하더라도 어떠한 예외나 에러가 발생하지 않으며, 기존 데이터를 새 스키마에 맞춰 업데이트하는 작업 불필요
에이브로는 레코드를 읽을 때 스키마 전체를 필요로 하기 때문에 어딘가 저장된 스키마 필요
ProducerRecord 객체
카프카 메시지
키-밸류 순서쌍(key-value pair)
키의 역할
같은 키값을 가진 모든 메시지는 같은 파티션에 저장
키값이 null일 경우 레코드는 현재 사용 가능한 토픽의 파티션 중 하나에 랜덤하게 저장
파티션별로 저장되는 메시지 개수의 균형을 위해 라운드 로빈 알고리즘 사용
카프카 2.4 프로듀서부터 기본 파티셔너는 키값이 null인 경우 접착성(sticky) 처리를 하기 위해 라운드 로빈 알고리즘 사용
키값이 지정된 상황에서 기본 파티셔너를 사용할 경우 카프카는 키값을 해시(hash)한 결과를 기준으로 메시지를 저장할 파티션 특정 (파티셔너는 자체적인 해싱 알고리즘을 사용하기 때문에 자바 버전과 관계없이 변하지 않음)
카프카 클라이언트는 RoundRobinPartitioner
와 UniformStickyPartitioner
포함
특정한 키값에 대응되는 파티션은 파티션 수가 변하지 않는 한 변하지 않지만 토픽에 새로운 파티션을 추가하는 순간 유효하지 않음
레코드 헤더는 레코드의 키/밸류값을 건드리지 않고 추가 메타데이터를 심을 때 사용
ProducerInterceptor
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
void onAcknowledgement(RecordMetadata metadata, Exception exception)
클라이언트가 할당량을 다 채웠을 경우, 브로커는 클라이언트의 요청에 대한 스로틀링을 시작하여 할당량을 초과하지 않도록 함
스로틀 되는 와중에 클라이언트가 추가 요청을 쏟아낼 경우 브로커는 해당 클라이언트와의 커뮤니케이션 채널을 일시적으로 무시함으로써 정해진 할당량을 맞추고 브로커를 보호
JMX 메트릭을 통해 스로틀링 작동 여부 확인
produce-throttle-time-avg
produce-throttle-time-max
fetch-throttle-time-avg
fetch-throttle-time-max
죄송합니다... 1,2장 준비하다가 3장을 읽어야한다는 것을 까먹었습니다... 내일까지 읽고 작성하겠습니다.
Kafka 프로듀서에 대해 정리하면서, 이 도구가 얼마나 강력한지 다시 한번 느낄 수 있었다. 특히 메시지 배치 처리 기능을 통해 네트워크 사용 효율을 높이고, 다양한 설정 옵션을 통해 신뢰성 있는 데이터 전송을 보장할 수 있다는 점이 인상적이었다. Kafka를 사용해 대규모 데이터 처리 시스템을 구축할 때, 이러한 기능들을 잘 이해하고 활용하면 큰 도움이 될 것 같다.
이전 직장에서 Producer 처리에 대해서 옵션을 이것저것 찾아본 기억이 다시금 떠올랐다..
Consumer 처리가 더 중요하다고 생각하지만, Producer 처리에 대한것도 중요했던게 다시 생각났다.
AckMode 이 설정이 중요했는데 해당 부분에 대한 설정 정리는 아래에 적어본다.
하나더 주의 할점 Spring Data Kafka를 사용할텐데 해당 옵션에 대해 따라서 AckMode 설정이 필요하다.
Kafka 설정 중 enable.auto.commit 구성에 따라 오프셋을 자동으로 커밋할지 여부를 결정 enable.auto.commit 가 false 일때 AckMode 설정
예제를 통해 간단하게 Producer를 구현하고, Avro 이용 방법과 커스텀 파티셔너, 커스텀 인터셉터를 구현해볼 수 있어서 재밌었다. 그리고 나는 예제 먼저 작성해보고 3.4 프로듀서 설정하기를 읽었더니 어떤 지점에서 무슨 옵션들을 사용해볼 수 있는지 머리에 그려져서 더 좋았다.
작성한 예제: https://github.com/iamzin/Practice-Kafka/commit/18fb41295da4646b06130599b3183435a3d5ed8c
@Component
class CustomPartitioner : Partitioner {
companion object {
const val ANONYMOUS_PARTITION_LOCATION = -1
const val ANONYMOUS_NAME = "anonymous"
}
override fun configure(configs: MutableMap<String, *>?) {}
override fun close() {}
override fun partition(
topic: String?,
key: Any?,
keyBytes: ByteArray?,
value: Any?,
valueBytes: ByteArray?,
cluster: Cluster?,
): Int {
val partitions = cluster?.partitionsForTopic(topic)
val numPartitions = partitions?.size ?: 1
if (keyBytes == null || key !is String) {
throw InvalidRecordException("All messages must have a key")
}
return if (key == ANONYMOUS_NAME) {
numPartitions + ANONYMOUS_PARTITION_LOCATION
} else {
abs(key.hashCode()) % (numPartitions - 1)
}
}
}
Chapter Ownership @zinokim