Closed yeollow closed 3 years ago
KafkaTemplate에 대한 설정관련 내용을 Spring Kafka Docs를 참조하여 아래와 같이 구성
@Bean
public ProducerFactory<Long,CCTV> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public KafkaTemplate<Long, CCTV> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
이후 KafkaItemWriter
에서 KafkaItemWriterBuilder
를 통해 kafkaTemplate()
관련 내용을 작성 시 매개변수로 위의 kafkaTemplate()
을 전달하니 아래와 같은 Exception이 발생.
org.springframework.beans.factory.BeanCurrentlyInCreationException
org.springframework.beans.factory.UnsatisfiedDependencyException
BeanCreationException
으로 Spring Bean생성 중 특정 예외가 발생하여 특정 field에 의존 주입이 되어있지 않아 Bean생성이 되지 못하는 경우 던져지는 예외.kafkaTemplate에 대한 properties설정을 application-local.yml에 지정하여 Spring Boot가 application을 구동할 때 자동으로 로딩되게 설정함.
kafka가 이미 local에 다운로드 되어있으며 kafka가 다운로드 되어있는 경로에서 아래 과정을 진행한다고 가정
bin/zookeeper-server-start.sh config/zookeeper.properties
를 통해 zookeeper를 먼저 구동bin/kafka-server-start.sh config/server.properties
를 통해 kafka server 구동
advertised.listeners=PLAINTTEXT://localhost:9092
, zookeeper.connect=localhost:2181
확인.
advertised.listeners
는 producer/consumer를 위해 노출할 주소를 의미함../gradlew :batch:bootRun -Dspring.profiles.active=local
를 통해 spring batch job 실행
bin/kafka-console-consumer.sh --topic CCTV --from-beginning --bootstrap-server localhost:9092
명령어를 통해 broker의 전달 내용을 consumer로 읽어 확인.이외 kafka 명령어
bin/kafka-topics.sh --zookeeper localhost:2181 --list
: topic list 확인bin/kafka-topics.sh --zookeeper localhost:2181 --topic topicName --delete
: topicName이름의 topic을 삭제
delete.topic.enable=true
지정 필요bin/kafka-topics.sh --zookeeper localhost:2181 --topic topicName --describe
: topicName이름의 topic 정보 확인\
partition()
과 replicas()
를 모두 1로 지정했음.bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topicName --partitions 3
와 같이 topicName이름의 topic의 partition을 3개 등으로 바꿀 수 있음. 하지만, Spring Boot에서 Topic생성 시 partition()
갯수를 변경하면 되는 부분.상기 명령어 등은 운영 환경에 따라 달라질 수 있음.
Spring Batch Job
자세한 Spring Batch내용은 Spring Batch Docs
Spring Batch metadata Table은 Meta-Data Schema Appendix 참조
Job이나 Step은 각각 JobBuilderFactory, StepBuilderFactory를 생성자 DI를 하여 build하며 Factory Pattern은 Factory Pattern이란?을 참조하자
Kafka
kafka producer로 kafka broker의 topic에 데이터를 전송하기 위해서는 몇가지 설정이 필요하다. 자세한 내용은 Spring Kafka Docs 참조
KafkaAdmin
에 대한 Bean을 configuration하면 자동으로 broker에 지정한 Topic을 생성해준다.Topicbuilder
로 topic의name()
,partitions()
,replicas()
등을 builder pattern을 통해 build하여NewTopic
Bean을 생성한다.KafkaAdmin.NewTopics
를 정의하여 여러개의NewTopic
을 한번에 정의할 수 있도록 지원한다.bootstrap-servers 및 application-{phase}.yml, Topic의 replicas 및 partitions는 개발 환경에 따라 다름