song960530 / foryou-family

0 stars 0 forks source link

Spring Scheduler와 Queue로 구현한 매칭 서비스 #116

Closed song960530 closed 2 years ago

song960530 commented 2 years ago

Spring Scheduler와 Queue로 구현한 매칭 서비스

아래와 같은 순서로 진행됩니다.

  1. 매칭 서비스 배경
  2. 3단계 분리된 Queue의 역할
  3. Spring Scheduler Config
  4. 단계별 구현클래스
  5. 재기동 시 미처리건 대응

ps. 매칭 서비스에서 파티 매칭이 이뤄지는 과정 먼저 읽어주시면 이해하시는데 도움이 됩니다.



매칭 서비스 배경



3단계 분리된 Queue의 역할



Spring Scheduler Config

Spring Scheduler Config는 아래와 같이 설정하였습니다.



단계별 구현클래스

3단계로 구성되어 요청을 처리하는 Queue는 아래와 같이 구현하였습니다.

첫번째로 FirstQueue 인터페이스의 다섯 구현클래스 중 하나인 Netflix이며, Netflix외 4개의 구현클래스 모두 동일한 방식으로 구현하였습니다.

@Qualifier("Netflix") // 동일 구현클래스가 5개 이므로 Bean Name 지정
@Component // 전역적으로 사용하기 위하여 컴포넌트화
public class Netflix implements FirstQueue {
    private final Queue<Long> memberQueue = new LinkedList<>();
    private final Queue<Long> ownerQueue = new LinkedList<>();

    // 파티원 Queue offer 메서드
    @Override
    public void offerMember(Long no) {
        memberQueue.offer(no);
    }

    // 파티장 Queue offer 메서드
    @Override
    public void offerOwner(Long no) {
        ownerQueue.offer(no);
    }

    @Override
    public int memberQueueSize() {
        return memberQueue.size();
    }

    @Override
    public int ownerQueueSize() {
        return ownerQueue.size();
    }

    /**
     * 파티원, 파티장 Queue를 pool하는 메서드
     * 다수의 Thread가 Critical Section에 접근할 때 동시성 이슈가 발생할 수 있어
     * synchronized block을 사용하여 동기화 처리
     */
    @Override
    public Optional<Response> pollQueues() {
        synchronized (this) {
            if (!ownerQueue.isEmpty() && !memberQueue.isEmpty())
                return Optional.of(Response.builder()
                        .ownerPk(ownerQueue.poll())
                        .memberPk(memberQueue.poll())
                        .build()
                );
            return Optional.empty();
        }
    }
}


두번째로 SecondQueue 인터페이스의 구현 클래스입니다. 파티원, 파티장 한 쌍으로 처리하기 때문에 1개의 구현클래스만 존재합니다.

@Component
public class MatchQueue implements SecondQueue {

    // 파티원 PK, 파티원 PK 한 쌍으로 저장
    private final Queue<Response> matchedQueue = new LinkedList<>();

    // 한 쌍의 PK값 Queue offer 메서드
    @Override
    public void offerMatched(Response matched) {
        matchedQueue.offer(matched);
    }

    /**
     * 첫번쨰 FirstQueue 구현클래스와 동일한 방식
     * 다수의 Thread가 Critical Section에 접근할 때 동시성 이슈가 발생할 수 있어
     * synchronized block을 사용하여 동기화 처리
     */
    @Override
    public Optional<Response> pollQueue() {
        synchronized (this) {
            if (!matchedQueue.isEmpty())
                return Optional.of(matchedQueue.poll());
            return Optional.empty();
        }
    }
}


마지막 세번째로 ThirdQueue 인터페이스의 구현 클래스입니다. SecondQueue와 동일하여 주석은 생략하겠습니다.

@Component
public class CompleteQueue implements ThirdQueue {
    private final Queue<Response> completedQueue = new LinkedList<>();

    @Override
    public void offerCompleted(Response complete) {
        completedQueue.offer(complete);
    }

    @Override
    public Optional<Response> pollQueue() {
        synchronized (this) {
            if (!completedQueue.isEmpty())
                return Optional.of(completedQueue.poll());
            return Optional.empty();
        }
    }
}



재기동 시 미처리건 대응

서비스 재기동 시 미처리건을 고려하여 아래와 같이 구현하였습니다

    ...

    /**
     * 서비스 재기동 시 미처리건 Queue에 저장
     */
    @PostConstruct
    private void init() {
        // 1단계 미처리건 Queue에 저장
        Arrays.stream(OttType.values()).forEach(ott -> {
            uploadWaitUnprocessData(ott, PartyRole.OWNER);
            uploadWaitUnprocessData(ott, PartyRole.MEMBER);
        });
        uploadStartUnprocessData(); // 2단계 미처리건 Queue에 저장
        uploadCompleteUnprocessData(); // 3단계 미처리건 Queue에 저장
    }

    ...