kwonslog / how-to-use-kafka

0 stars 0 forks source link

spring webflux 기반 메세지 송신 테스트 #3

Open kwonslog opened 1 year ago

kwonslog commented 1 year ago

spring webflux 프로젝트 생성

kafka 사용을 위한 라이브러리 설정

import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;

@Component @RequiredArgsConstructor public class KafkaMessageProducer {

private final KafkaTemplate<String, String> kafkaTemplate;

@Value("${kafka.topic}") private String topic;

public void sendMessage(String message) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
future.addCallback(
  new ListenableFutureCallback<SendResult<String, String>>() {
    @Override
    public void onSuccess(SendResult<String, String> result) {
      // 메시지 전송 성공 처리
      System.out.println("Message sent successfully: " + result.getRecordMetadata().offset());
    }

    @Override
    public void onFailure(Throwable ex) {
      // 메시지 전송 실패 처리
      System.err.println("Failed to send message: " + ex.getMessage());
    }
  }
);

} }


- 그리고 송신에 필요한 kafka 설정을 추가한다.
```properties
//application.properties
# Kafka 서버 주소
spring.kafka.bootstrap-servers=localhost:9092

# Kafka 토픽 이름
kafka.topic=test-topic

# Kafka Producer 설정
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono;

@RestController @RequiredArgsConstructor public class SendMessageController {

private final KafkaMessageProducer kafkaMessageProducer;

@GetMapping(path = "/sendMessage") public Mono sendMessage() { kafkaMessageProducer.sendMessage("case1 sendMessage Test!");

return Mono.just("send message ok");

} }

- 이렇게 메세지 송신을 위한 코드작성은 완료하였다. 
- 실제 메세지를 받는 kafka 서버를 실행하고, 메세지 수신을 확인해보자. (#2 참고)

docker-compose up

docker exec -it test-kf kafka-console-consumer --topic test-topic --bootstrap-server localhost:9092

- 이제 브라우저에서 http://localhost:8080/sendMessage 호출하여 메세지가 수신 되는지 확인하면 된다.
<img width="850" alt="image" src="https://github.com/kwonslog/how-to-use-kafka/assets/65941166/0721700f-8f0b-41b1-a5ae-4fc5b36f9cc2">

#### 트러블슈팅
- 아래와 같은 오류 메세지가 발생 할 경우 hosts 파일을 수정하여 해결 할 수 있다. 
<img width="641" alt="image" src="https://github.com/kwonslog/how-to-use-kafka/assets/65941166/374612f3-f185-4010-a837-77c654c50040">

- hosts 파일을 열어서 아래 내용을 추가하고 서버를 재가동 후 다시 시도하면 정상적으로 처리 된다.

//C:\Windows\System32\drivers\etc\hosts 127.0.0.1 test-kafka

#### Reactive Kafka 라이브러리를 사용하여 메세지 송신 구현
- 아래 클래스를 추가한다.
```java
package com.example.testmessagesendrecv.case1;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import reactor.kafka.sender.SenderOptions;

@Configuration
public class ReactiveKafkaConfiguration {

  @Bean
  public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    return new ReactiveKafkaProducerTemplate<>(SenderOptions.create(props));
  }
}
package com.example.testmessagesendrecv.case1;

import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class KafkaMessageReactiveProducer {

  /*
   * Reactive Kafka 라이브러리를 사용하여 메세지 송신 코드를 작성.
   */
  private final ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate;

  @Value("${kafka.topic}")
  private String topic;

  public void sendMessage(String message) {
    reactiveKafkaProducerTemplate
      .send(topic, message)
      .doOnSuccess(sendResult -> {
        // 메시지 송신 성공 처리
        System.out.println("Message sent successfully: " + sendResult.recordMetadata().offset());
      })
      .doOnError(error -> {
        // 메시지 송신 실패 처리
        System.err.println("Failed to send message: " + error.getMessage());
      })
      .then()
      .subscribe();
  }
}
kwonslog commented 1 year ago

KafkaTemplate 과 KafkaProducer