manjeong-dev / kafka

0 stars 0 forks source link

카프카 프로듀서 애플리케이션 개발 #5

Open manjeong-dev opened 2 years ago

manjeong-dev commented 2 years ago

아파치 카프카의 프로듀서는 자바를 공식 언어로 제공

image

producerRecord 에는 offset은 없다!

토픽과 메시지값만 있어도 record는 보낼 수 있다 record를 send()로 보낼 수 있게 됨

partitioner

accumulator

sender

UniformStickyPartitioner

프로듀서 주요 옵션(필수 옵션) - default 값이 없음

프로듀서 주요 옵션(필수 옵션)

ISR(In-Sync-Replicas)

프로듀서에서 오프셋이 생성되는게 아니라, 리더파티션에 저장될 때 오프셋이 지정됨 (ProducerRecord) .flush() : accumulator에 잇는 애들 브로커로 전송

커스텀 파티셔너를 가지는 프로듀서

producer.close() 를 통해

manjeong-dev commented 2 years ago

ISR은 replication factor=1 일 때도 존재 프로듀서에서 보낸 데이터 전송 속도가 너무 늦으면 accumulator도 확인해봐서 배치가 오래 걸리지 않는지 확인

manjeong-dev commented 2 years ago

Replication Factor가 2이고, min.insync.replicas를 2로 설정하게 되었을때, 브로커 노드 한대만 다운되더라도 프로듀서가 메시지를 보낼 수 없게 됩니다. 그래서 저는 RF를 2로 운영 할 경우 min.insync.replicas를 2로 설정하기 보다는 1로 설정하는 것이 최적의 값이라고 생각합니다. 에러 발생

[2017-01-03 19:54:44,004] ERROR [Replica Manager on Broker 4]: Error processing append operation on partition peter-topic-rf2-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync replicas for partition [peter-topic-rf2,0] is [1], below required minimum [2]

acks=all, Replication Factor=3과 min.insync.replicas=2

https://www.popit.kr/kafka-%EC%9A%B4%EC%98%81%EC%9E%90%EA%B0%80-%EB%A7%90%ED%95%98%EB%8A%94-producer-acks/

manjeong-dev commented 2 years ago

bootstrap.servers=broker1:9092,broker2:9092,broker3:9092로 설정되어 있다면 클라이언트는 broker1:9092 부터 메타데이터를 요청하고 해당 요청이 실패하면 broker2:9092로 요청합니다.

manjeong-dev commented 2 years ago

Q1. 다음중 프로듀스 옵션 설명으로 올바르지 않은 것은? boorstrap.server: 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트:포트를 1개 이상 작성한다. key.serializer: 레코드의 메세지 키를 직렬화하는 클래스를 지정한다. value.serializer: 레코드의 메세지 값을 직렬화하는 클래스를 지정한다. asks: 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는데 사용하는 옵션. 기본 값은 0 enable.idempotence: 멱등성 프로듀서로 동작할지 여부를 설정한다. 기본값은 false => 4번 기본 값:1 Q2. producer에서 record의 offset이 지정된다 (O/X) => X Q3. ISR을 위한 설정 옵션으로 acks와 min.insync.replicas는 topic단위 옵션이다. (O/X) => O Q4. 메시지 키를 설정하여 데이터를 전송 하던 중, 파티션의 수를 증가시켰다. 이때 동일한 메시지 키는 항상 동일한 파티션에 저장된다. (O/X) => X Q5. min.insync.replicas는 팔로워 파티션의 동기화 개수를 뜻한다 (O/X) => X (리더 + 팔로워 파티션) Q6. acks=0 설정 후 producer.send(record).get() 실행 시 응답에 관하여 옳은 것을 모두 고르시오. 토픽을 알 수 있다. 파티션을 알 수 있다. 오프셋을 알 수 있다. 어떤 정보도 알 수 없다. => 3은 -1로 와서 알 수 없음, 답 : 1,2 Q7. producer.send()를 하는 경우 Sender를 거쳐서 Accumulator에서 레코드를 모아서 발송한다. (O/X) => X (sender -> partitioner -> accumulator) Q8. 프로듀서에서 linger.ms 옵션을 0 이상으로 하는 경우 언제나 이 시간만큼 대기했다가 발송한다. (O/X) => X (accumulator에 모인 배치 byte 크기보다 크면 바로 send) (배치 size 한계치 or linger.ms 중 하나만 만족하면 send) Q9. acks = -1(all), min.insync.replicas=1의 동작은 acks=0과 동일하다 (O/X) => X (acks=1과 동일) Q10. 메시지 키가 없는 경우 RoundRobinPartitioner 는 UniformStickyPartitioner 에 비해 어큐뮤레이터에서 레코드들이 배치로 묶일 때까지 기다렸다가 전송하기 때분에 전송 성능이 더 높아진다. ( O / X ) => X (UniformStickyPartitioner 얘기) Q11. 다음 중 올바르지 않은것은? 데이터 처리량이 많을때 max.in.flight.requests.per.connection 값을 늘린다. (producer 설정) tranactional.id 값을 설정하면 enable.idempotence 은 자동으로 true로 바뀐다. replication-factor = 3 일때 acks = -1 이더라도 min.insync.replicas = 2 이면 리더 파티션과 팔로워 파티션 1개까지만 확인한다. acks = 0 으로 설정한 후 producing RecordMetadata 를 조회했을때 알 수 있는 값은 전송된 토픽, 파티션값 이고, offset은 알 수 없다. (offset = -1로 오기 때문) key, value의 데이터 타입을 어떤것으로 사용하느냐는 디스크 사용량, 네트워크 사용량에 영향을 미치지 않는다. => 5 (ex. String serialize 가 int serialize보다 많이 씀) Q12. 다음 중 옵션과 설정위치가 잘못 된 것은? replication-factor = 토픽 옵션 min.insync.replicas = 토픽 옵션 enable.idempotence = 토픽 옵션 acks = 프로듀서 옵션 max.in.flight.requests.per.connection = 프로듀서 옵션 => 3 (프로듀서 옵션) Q13. 메시지 전송 방법 중 전송 속도는 느리지만 메시지 손실이 없어야 하는 경우 acks=all, 브로커의 min.insync.replicas 옵션을 2 이상으로 설정한다. (O/X) => X (replication-factor=3 일 경우 O)

manjeong-dev commented 2 years ago

linger.ms || batch.size 조건으로 send 하는 부분코드 https://github.com/apache/kafka/blob/2b2039f0ba88e210dd09031291c050cfcda7ce4a/clie[…]/apache/kafka/clients/producer/internals/RecordAccumulator.java v2.5