farmeter / random

0 stars 0 forks source link

MSA > 스프링 클라우드 스트림, 이벤트 기반 아키텍처 #14

Open farmeter opened 4 years ago

farmeter commented 4 years ago

8장 목표 : 비동기 메시지를 사용하는 Micro Service를 구현해보자

Event Driven Arch, Message Driven Arch

캐시를 이용한 비동기 예제

데이터베이스 엑세스 비용을 줄이고, 응답 시간을 향상시켜보도록 레디스를 써보자.

동기식 요청

전통적인 동기식 요청-응답 프로그래밍

스크린샷 2019-11-27 오후 10 22 56
  1. A서비스는 B서비스의 데이터를 조회할 수 있는지 레디스 클러스터에 확인
  2. 레디스에 없다면, REST 기반의 엔드포인트를 사용해 B서비스를 호출
  3. 받은 데이터를 레디스에 저장
  4. 데이터 무효시 B서비스-> A서비스의 캐시삭제 엔드포인트를 호출하거나, 레디스로 바로 캐시evict

문제점

=> 전형적인 거미줄 모양의 의존성 패턴 = 주요 장애지점

메시징을 사용한 변화 전달 방식

4가지 장점이 생긴다.

  1. 느슨한 결합

    • 서로 두 서비스가 알지 못하므로 결합되지 않는다
    • 서비스가 소유한 데이터를 직접 관리하는 엔드포인트만 노출함으로써 의존성을 최소화 한다
    • 서로 분산되어 상호작용 하며 서로가 관리하는 데이터에만 관심을 가진다
  2. 내구성

    • B서비스가 다운되어도 큐가 존재하므로 메시지 전달을 보장한다
    • 큐에 저장되고 다운된 서비스가 재개 할때까지 유지된다
    • 큐 + 캐시가 조합되면 지난 데이터라도 보존되어 유용할 수 있다
  3. 확장성

    • 큐에 저장되므로 메세지 발신자가 소비자의 응답을 기다릴 필요가 없다.
    • 소비자가 메세지를 빠르게 처리하지 못하면, 소비자를 수평확장하여 큐를 처리할 수 있다.
    • 스케일 업 대신 스케일 아웃을 통해 제약을 최소화 할수 있다.
  4. 유연성

    • 발신자는 누가 메세지를 소비할지 모른다. 새로운 소비자, 새로운 기능을 쉽게 추가할 수 있다.

메세지 아키텍처의 단점

  1. Message handling semantics 메시지 처리의 의미론

    • 메세지를 발행하고 소비하는 방법을 많이 고민해야 한다.
    • App이 메시지의 소비 순서를 기반으로 어떻게 동작할지, 순서대로 처리되지 않으면 어떻게 될지 고민해야 한다.
    • 메시지를 순서대로 처리해야하는 요구사항이 있다면, 모든 메세지를 독립적으로 소비하는 방식과 다르게 처리하고 설정하도록 구성해야 한다.
    • 메시지가 실패하면 재시도 할 것인가? 아니면 실패하게 놔둘 것인가? 특정 메세지 중 하나만 실패하면 나머지는 어떻게 처리할 것인가?
  2. Message visibility 메세지 가시성

    • 메세지 사용은 얼마든지 비동기+동기식으로 합쳐질 수 있다.
    • 메세지의 수신/처리 시 호출하고 경유하는 트랜잭션을 추적하기 위해서는 별도의 상관관계 ID correlation ID 를 사용해야한다.
  3. Message choreography 메시지 코레오그래피

    • 코드가 단순한 요청-응답의 선형적인 처리방식이 아니다. 비즈니스 로직을 추론하기 어렵다.
    • 여러 트랜잭션의 순서가 바뀌고 다른 시점에 실행될 수 있다.

스프링 클라우드 스트림

image

스프링 클라우드 스트림 아키텍처

4가지 컴포넌트

  1. 소스 Source
    • 발행될 메세지를 표현하는 POJO. Json 직렬화되어 메시지를 채널로 발행
  2. 채널 Channel
    • 큐를 추상화 한 것. 생산자/소비자가 발행/소비할 메시지를 보관
  3. 바인더 Binder
    • 메시지 플랫폼과 통신. 특정 API.
  4. 싱크 Sink
    • 메세지를 받아 처리. 채널을 수신대기 하다 메세지를 받아 POJO로 역직렬화.
스크린샷 2019-12-01 오후 7 39 36

생산자/소비자 코드를 작성해보자

생산자

라이브러리를 추가하자

dependencies {    
    compile 'org.springframework.cloud:spring-cloud-stream'
    compile 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
}

App이 스프링 클라우드 스트림의 메시지 브로커와 바인딩하도록 지정해주자.

...
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

//스프링 클라우드 스트림에 애플리케이션 메시지 브로커로 바인딩 하라고 알린다.
//Source 클래스에 정의된 채널들을 이용해 메시지 브로커와 통신하게 된다.
@EnableBinding(Source.class) 
public class Application {
...

브로커에 메시지를 발행해보자

@Component
public class SimpleSourceBean {
    private Source source;

    //서비스가 사용할 소스 인터페이스 구현을 주입한다.
    //특정 메시지 토픽에 대한 모든 통신은 스프링 클라우드 스트림 구조로 발생한다.
    @Autowired 
    public SimpleSourceBean(Source source){
        this.source = source;
    }

    public void publishOrgChange(String action, String orgId){
        //POJO 메세지를 발행한다.
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action,
                orgId,
                UserContext.getCorrelationId());

        //Source클래스에서 정의된 채널에서 send() 메서드를 사용해 메세지를 발행한다.
        source.output().send(MessageBuilder.withPayload(change).build());
        /**
         Source 인터페이스는 스프링 클라우드에서 정의한 인터페이스로 output()이라는 단일 메서드를 노출한다.
         output 메서드는 MessageChannel 클래스 타입을 반환한다.

         send() 메서드는 스프링 MessageBuilder라는 스프링 헬퍼 클래스를 사용해 변환된 Message를 매개변수로 받는다.
        */          

    }
}

스프링 클라우드 스트림의 Source와 카프카 브로커/토픽을 매핑해보자 application.yml

...
    stream: 
      bindings: //srteam.binding은 스클스의 메시지 브로커에 발행하려는 구성의 시작점
        output: //output은 채널 이름이며, Source.output()채널에 매핑
            destination:  orgChangeTopic    //메시지 큐(or 토픽) 이름
            content-type: application/json  //송수신할 메시지 타입 정보(or xml, avro)
      kafka:    //메세지 버스로 카프카를 사용할 것이라고 스프링에 전달
        binder: 
          zkNodes: localhost //카프카와 주키퍼의 네트워크 위치를 전달
          brokers: localhost

실제로 어디서 메시지가 발행되는지 살펴보자

@Service
public class OrganizationService {
    @Autowired
    private OrganizationRepository orgRepository;

    @Autowired
    SimpleSourceBean simpleSourceBean;

    public Organization getOrg(String organizationId) {
        return orgRepository.findById(organizationId).get();
    }

    public void saveOrg(Organization org){
        org.setId( UUID.randomUUID().toString());

        orgRepository.save(org);
        simpleSourceBean.publishOrgChange("SAVE", org.getId());
        //생성, 삭제 등 모든 메서드가 publishOrgChange를 사용한다.
    }
    ...

생산자를 정리해보면

데이터가 변경되면 OrganizationService.saveOrg()

  1. 소스(SimpleSourceBean) 에서 메시지가 발행됨
  2. 채널은 카프카 토픽에 매핑되는 스프링 클라우드 스트림의 채널 이름이다.
  3. 바인더는 카프카 서버에 바인딩 되는 클래스와 구성을 말한다. 이후 카프카 토픽으로 전달되어 처리 진행

image

소비자

import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.annotation.StreamListener;

@EnableBinding(Sink.class) //메시지를 수신가능하도록 Sink 인터페이스에 정의된 채널을 사용
public class Application {

...
    @StreamListener(Sink.INPUT) //메시지가 수신될 때마다 실행 된다.
    public void loggerSink(OrganizationChangeModel orgChange) {
        //매개변수인 OrganizationChangeModel로 자동 역직렬화 된다.
        logger.debug("Received an event for organization id {}", orgChange.getOrganizationId());
    }

application.yml

    stream:
      bindings:
        input: // input채널을 orgChangeTopic 큐에 매핑한다.
          destination: orgChangeTopic
          content-type: application/json
          group: licensingGroup //소비자 그룹을 정의. 여러 인스턴스가 있을 때 소비자 그룹을 식별해서 하나만 소비하도록 보장(카프카 컨슈머 그룹)
      kafka:
        binder:
          zkNodes: localhost
          brokers: localhost

ncp-ganesha의 이벤트 사용은?

import org.springframework.kafka.annotation.EnableKafka;

...
@EnableKafka
public class KafkaConfig {
...
}

Service -> kafkaEventProducer.send

// 원상품 싱크 이벤트 생성.
kafkaEventProducer.send(NcpProdEventType.PRODUCT_SYNC, ProductBaseMsg.builder()
    .productIds(Lists.newArrayList(productId))
    .build());

KafkaEventProducer

...
public class KafkaEventProducer {
...
    @Autowired
    private KafkaTemplate<Long, Object> kafkaTemplate;

    /**
     * 이벤트에 의한 발행.
     *
     * @param event
     * @param <T>
     */
    public <T> void send(Event<T> event) {
        doSend(event.eventType(), event.key(), event.value(), event.timestamp());
    }
...
    /**
     * {@link Transactional} 메소드 내부에서 호출한 경우 정상적으로 Commit 된 이후에 발행한다.
     *
     * @param event
     * @param <T>
     */
    public <T> void sendAfterCommit(Event<T> event) {
        doSendAfterCommit(event);
    }
...

    /**
     * 실제 발송 처리.
     *
     * @param eventType
     * @param key
     * @param value
     * @param <T>
     */
    private <T> void doSend(EventType eventType, Long key, T value) {
        doSend(eventType, key, value, null);
    }

    private <T> void doSend(EventType eventType, Long key, T value, Long timestamp) {
...
        ProducerRecord<Long, Object> record = new ProducerRecord<>(
            eventType.eventName(),
            null,
            timestamp,
            key,
            value
        );

        // async 발행.
        kafkaTemplate.send(record);
    }
...
        // @KafkaListener는 일반적인 경우 사용을 권장하지않는다.
        // 정말 사용하고 싶을때는 재처리 정도의 작업일 때 사용하자.

요약

메시지/이벤트 기반 아키텍처

스프링 클라우드 스트림

farmeter commented 4 years ago

기본적인 MSA에 대한 고민 => 도메인 분리, 경계를 어떻게 나눌 것인가. => 예외 처리를 어떻게 할 것인가(롤백 할 것인가. 별도 배치로 처리할 것인가) => 메세지 유실, 순서가 변경되면 어떻게 할 것인가. => Lock은 어떻게 처리할 것인가