daangn-daangn / daangn-server

🥕당근 서버 리포지토리🥕
4 stars 2 forks source link

kafka Producer & Consumer 비즈니스 로직 외 설정 부분 #90

Closed cotchan closed 2 years ago

cotchan commented 2 years ago

작업

목차

  1. Producer 추가 설정
  2. Producer 예외 핸들링 적용
  3. Consumer 예외 핸들링 적용

Producer 추가 설정

retries

@Configuration
public class KafkaProducerConfig {
    //...
    private Map<String, Object> producerConfigs() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configs.put(ProducerConfig.RETRIES_CONFIG, retries);
        return configs;
    }
}
spring:
  kafka:
    producer:
      retries: 10

Producer 예외 핸들링 적용

@Slf4j
public class BuyerReviewCreatedEventListener implements AutoCloseable {

    @Value("${spring.kafka.topic.buyer-review-created}")
    private String buyerReviewCreatedTopic;

    private final EventBus eventBus;

    private final KafkaTemplate<String, BuyerReviewCreatedMessage> kafkaTemplate;

    //...

    @Subscribe
    public void handleBuyerReviewCreatedEvent(BuyerReviewCreatedEvent event) {
        Long sellerId = event.getSellerId();
        Long reviewerId = event.getReviewerId();

        ListenableFuture<SendResult<String, BuyerReviewCreatedMessage>> future =
            kafkaTemplate.send(buyerReviewCreatedTopic, BuyerReviewCreatedMessage.from(event));

        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable ex) {
                log.warn("handleBuyerReviewCreatedEvent exception occurred with sellerId: {}, reviewerId: {}: {}",
                        sellerId, reviewerId, ex.getMessage(), ex);
            }

            @Override
            public void onSuccess(SendResult<String, BuyerReviewCreatedMessage> result) {
            }
        });
    }

Consumer 예외 핸들링 적용

@Slf4j
@Configuration
public class KafkaListenerConfig {

    //...

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, SoldOutMessage> kafkaListenerContainerSoldOutFactory() {
        //...
        factory.setCommonErrorHandler(defaultErrorHandler());
        return factory;
    }

    //...

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, PriceDownMessage> kafkaListenerContainerPriceDownFactory() {
        //...
        factory.setCommonErrorHandler(defaultErrorHandler());
        return factory;
    }

    //...

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, SoldOutToBuyerMessage> kafkaListenerContainerSoldOutToBuyerFactory() {
        //...
        factory.setCommonErrorHandler(defaultErrorHandler());
        return factory;
    }

    //...

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, BuyerReviewCreatedMessage> kafkaListenerContainerBuyerReviewCreatedFactory() {
        //...
        factory.setCommonErrorHandler(defaultErrorHandler());
        return factory;
    }

    //...

    @Bean
    public DefaultErrorHandler defaultErrorHandler() {
        return new DefaultErrorHandler(((consumerRecord, e) ->
            log.warn("defaultErrorHandler invoked with consumerRecord: {}, message: {}",
            consumerRecord, e.getMessage(), e)), new FixedBackOff(1000L,3L));
    }
}