kmg28801 / kafka-study

1 stars 0 forks source link

Chapter 4. 카프카 프로듀서 #4

Closed kmg28801 closed 1 year ago

Genie-young commented 1 year ago

chapter 4. 카프카 프로듀서

4.1 콘솔 프로듀서로 메시지를 보내는 방법

#존재하지 않는 토픽으로 메시지 전송 시 자동 토픽 생성
auto.create.topics.enable = true

#kafka-topics.sh
##토픽 생성하기
# /usr/local/kafka/bin/kafka-topic.sh 토픽 관련 기능이 들어있는 쉘파일
# --zookeeper: 주키퍼 리스트, --topic: 토픽명, --replication-factor: 복제 갯수
#zookeeper 리스트를 server.properties와 동일하게 맞춰주지 않으면 토픽 생성 에러남 *참고1
/usr/local/kafka/bin/kafka-topics.sh \
--zookeeper peter-zk001:2181,peter-zk002:2181,peter-zk003:2181/peter-kafka \
--topic peter-topic --partitions 1 --replication-factor 3 --create

image

##토픽 정보 확인하기
/usr/local/kafka/bin/kafka-topics.sh \
--zookeeper peter-zk001:2181/peter-kafka --topic peter-topic --describe

image

PartitionCount(파티션 개수) : 1개, ReplicationFactor(복제수) : 3개, Topic(토픽명) : peter-topic , Partition(파티션):0번 파티션, Leader : 3번, Replica(복제 중인 broker) : 3,1,2, isr(ISR 위치) : 3,1,2

#kafka-console-producer.sh
##카프카 토픽으로 테스트 메시지 전송하기 
## broker 
# /usr/local/kafka/bin/kafka-console-producer.sh 프로튜서 관련 기능이 들어있는 쉘파일
# --broker-list: 메세지를 받을 브로커 목록, --topic:메세지를 받을 토픽, --compression-codec : 압축 포맷, request-required-acks:회신 수, 뒷장에 더 설명있음. 
/usr/local/kafka/bin/kafka-console-producer.sh \
--broker-list peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092 \
--topic peter-topic
#명령 실행 후 입력창이 나오면 카프카에 보낼 메세지 작성하고 엔터, 중단하고 싶으면 Ctrl+C

/usr/local/kafka/bin/kafka-console-producer.sh \
--broker-list peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092 \
--topic peter-topic --compression-codec 'gzip' --request-required-acks 1

image

#kafka-console-consumer.sh
##카프카로 들어온 메세지 확인하기
/usr/local/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092 \
--topic peter-topic --from-beginning

image

4.2 자바와 파이썬을 이용한 프로듀서

카프카는 스칼라 기반, 메인 클라이언트 라이버러리는 자바로 만들어진 어플리케이션. 가장 많은 기능을 제공하는 자바 및 파이썬으로 실습 진행. send()로 보내는 방식은 3가지(메세지를 보내고 미확인, 동기 전송, 비동기 전송)

jdk 설치 및 kafka-clients-2.5.0.jar, slf4j-api-2.0.7.jar 만 로드하여 cmd로 자바 코드 실행함.

/*파일명 : KafkaBookProducer1.java */
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaBookProducer1 {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    /* send() 메소드로 ProducerRecord를 보냄. 
     메시지는 버퍼에 저장되고 별도의 스레드를 통해 브로커로 전송함. */
    producer.send(new ProducerRecord<String, String>("peter-topic", "Apache Kafka is a distributed streaming platform"));
    producer.close();
  }
}
javac -cp "외부 라이브러리 경로" KafkaBookProducer1.java
java  -cp "외부 라이브러리 경로"; KafkaBookProducer1 

image image

./*프로듀서에 추가 옵선 적용*/
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaBookProducer1 {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
    props.put("acks", "1");
    props.put("compression.type", "gzip");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, String>("peter-topic", "Apache Kafka is a distributed streaming platform"));
    producer.close();
  }
}

4.2.1 메시지를 보내고 확인하지 않기

카프카가 살아있는 상태에서 프로듀서에서 에러가 났다면 재전송하기 때문에 대부분 성공적으로 전송되지만 일부 메시지는 손실될 수 있다. 메시지 손실 각능성이 있기 때문에 일반적인 서비스 환경에서는 사용하지 않는다.

/*메시지를 보내고 확인하지 않기 */
/*KafkaBookProducer1.java 코드와 비교해 프로듀서에서 에러가 나는 경우의 예외 처리를 추가함.
 해당 예제 코드에서는 에러가 날 경우 재전송에 대한 코드없이 단순 에러 출력하고 종료함.
 재전송하려면 추가 코드 작성이 필요.*/
Producer<String, String> producer = new KafkaProducer<>(props);
try{
    /* 결과객체를 받아오지 않아 성공적으로 전송했는 지 여부는 알 수 없음. */
    producer.send(new ProducerRecord<String, String>("peter-topic", "Apache Kafka is a distributed streaming platform"));
}catch(Exception){
    /*프로듀서에서 보내는 도중 에러가 날 경우의 에러 출력*/
    exception.printStackTrace();
}finally{
    /* 연결 종료 */
    producer.close();
}

4.2.2 동기 전송

/*동기전송 */
/*프로듀서는 메시지를 send()메소드는 Future 객체를 반환하고, Future 객체의 get()메소드를 통해 RecordMetadata의 정보를 가져와서 성공 여부 확인*/
Producer<String, String> producer = new KafkaProducer<>(props);
try{
    /*결과값을 받아 객체에 저장, get으로 객체를 가지고 올때까지 기다림. */
    RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("peter-topic", "Apache Kafka is a distributed streaming platform")).get();
    /*메시지가 저장된 파티션 정보 및 위치를 받아서 출력.*/
    System.out.printf("Partition:%d, Offset:%d", metadata.partition(), metadata.offset())
}catch(Exception){
    /*예외 처리 시 재시도 가능한 예외와 재시도 불가능 예외가 발생할 수 있음. 
      재시도 가능한 예외 : 다시 전송하여 해결 ex) 커넥션 에러
      재시도 불가능한 예외 ex) 메시지 최대 크기 초과*/
    exception.printStackTrace();
}finally{
    producer.close();
}

4.2.3 비동기 전송

/* 비동기전송, 파일명 : PeterCallback.java */
/*org.apache.kafka.clients.producer.Callback 구현 클래스 필요
  메세지 보낼 때 콜백을 호출하고, 카프카 브로커에서 응답을 받으면 콜백함.
  응답을 기다리지 않기 때문에 빠른 전송이 가능함. 
  메시지 미전송 시 예외 처리로 에러 기록을 하여 향후 처리 가능함.*/
import org.apache.kafka.clients.producer.Callback

class PeterCallback implements Callback{
    /*오류 리턴 시, onCompletion 예외를 갖게 됨. */
    public void onConmpletion(RecorMetadat metadata, Exception exception){
        if(metadata !=null ){
            System.out.println("Partition : " + metadata.partition() + ", Offset:" +  metadata.offset() + "");
        }else{
            exception.printStackTrace();
        }
    }
}
/*비동기전송, 파일명 : KafkaBookProducer3.java*/

Producer<String, String> producer = new KafkaProducer<>(props);
try{
    /* 프로듀서에 레코드 전송 시, 콜백 오브젝트를 같이 보냄. */
    RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("peter-topic", "Apache Kafka is a distributed streaming platform"),new PeterCallback());
}catch(Exception){
    exception.printStackTrace();
}finally{
    producer.close();
}

파이썬

kafka-python 라이브러리가 이용자 수가 많고, confluent-kafka-python이 성능이 좋음. confluent-kafka-python 사용을 위해서는 librdkafka(아파티 카프카 프로토콜의 C 라이브러리) 설치 필요 책은 kafka-python 사용

from kafka import KafkaProducer

#메세지 전송을 위해 broker list 지정
#broker list를 다 지정하지 않은 경우, 지정한 호스트가 다운되면 오류 발생함.
#다 지정하면 한 개의 호스트가 다운되도 다른 호스트에 메시지 전달.
producer = KafkaProducer(bootstrap_servers='peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092')

#메시지 내용 적고 전송 
producer.send('peter-topic', 'Apache Kafka is a distributed streaming platform')
#추가 옵션 지정
from kafka import KafkaProducer

producer = KafkaProducer(acks=1, compression_type='gzip', bootstrap_servers='peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092')

producer.send('peter-topic', 'Apache Kafka is a distributed streaming platform')

4.3 프로듀서 활용 예제

#파티션이 2개인 토픽 생성하기(peter-topic2)
C:\Kafka\kafka001\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181,localhost:2182,localhost:2183/peter-kafka --replication-factor 1 --partitions 2 --topic peter-topic2
/* loop 사용 + key 지정하기 */
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaBookProducerKey {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "peter-kafka001:9092,peter-kafka002:9092,peter-kafka003:9092");
    props.put("acks", "1");
    props.put("compression.type", "gzip");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    String testTopic = "peter-topic2";
    String oddKey = "1"; 
    String evenKey = "2";

    for (int i = 1; i < 11; i++) {
      if (i % 2 == 1) {
        /*키 지정 후 메시지 전송하기 홀수일 때는 1번 파티션, 짝수는 2번 파티션으로 전송*/
        producer.send(new ProducerRecord<String, String>(testTopic, oddKey, String.format("%d - Apache Kafka is a distributed streaming platform - key=" + oddKey, i)));
      } else {
        producer.send(new ProducerRecord<String, String>(testTopic, evenKey, String.format("%d - Apache Kafka is a distributed streaming platform - key=" + evenKey, i)));
      }
    }
    producer.close();
  }
}

image image

0번 파티션만 가져오기

topic을 생성하지 않고 메시지 전송 시, 카프카 옵션에 따라 토픽이 자동 생성될 수 있다. 이럴 경우 파티션은 1인데, 토픽 삭제 후 재생성하거나, alter 명령어를 이용해 수정한다. *참고 2

C:\Kafka\kafka001\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic peter-topic2 --from-beginning --partition 0
C:\Kafka\kafka001\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic peter-topic2 --from-beginning --partition 1

image

4.4 프로듀서의 주요 옵션

4.5 다양한 메시지 전송 방법

4.5.1 메시지 손실 가능성이 높지만 빠른 전송이 필요한 경우

4.5.2 메시지 손실 가능성이 적고 적당한 속도의 전송이 필요한 경우

4.5.3 전송 속도는 느리지만 메시지 손실이 없어야 하는 경우

참고

  1. 카프카 토픽 생성 에러(ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0)

    해결 : zookeeper 리스트를 server.properties와 동일하게 맞춰줌, 지노드명까지

    출처 : https://twofootdog.tistory.com/94

seojeonghyeon commented 1 year ago

https://github.com/seojeonghyeon/bankservice-userservice (com.zayden.bankserviceuserservice.kafka 내 producer) https://docs.spring.io/spring-kafka/reference/html/#kafka-template (Spring Boot 2.9 -> 3.0 주요 변경사항 및 사용방법)

kmg28801 commented 1 year ago

Spring 기반 애플리케이션에서 도커 host 모르는 경우 해결방법