yeoseon / tip-archive

트러블 슈팅 및 팁을 모아두는 레포 (Today I Learned)
29 stars 2 forks source link

[Java] Java Concurrency Evolution (동시성 처리의 여러가지 방식) #267

Closed yeoseon closed 3 years ago

yeoseon commented 3 years ago

동시성 처리에 대해 궁금했었는데, 마침 좋은 아티클이 올라왔다.

http://homoefficio.github.io/2020/12/11/Java-Concurrency-Evolution/

Java Concurrency Evolution (동시성 처리의 여러가지 방식)

자바 스레드

태스트

task는 동시에 호출되어 실행된다.

반면에 모든 요청이 동시에 실행되는 이상적인 Full Concurrency 방식에서는 Max(서비스A 처리 시간, 서비스B 처리 시간) + 저장 시간 만큼, 그러니까 1,300ms가 필요하다.

동시성 미사용

네이티브 멀티 스레딩

멀티 스레딩을 위한 난관

public void shouldExecuteIterationsConcurrently() throws InterruptedException {

    List<Thread> threads = new ArrayList<>();
    for (int user = 1; user <= USERS; user++) {
        Thread thread = new Thread(new UserFlow(user));
        thread.start();
        threads.add(thread);
    }
    // 종료 조건 - 가장 효율적인 방법은 아니지만 의도대로 동작한다.
    for (Thread thread : threads) {
        thread.join();
    }
}

static class UserFlow implements Runnable {

    private final int user;
    private final List<String> serviceResult = new ArrayList<>();

    UserFlow(int user) {
        this.user = user;
    }

    @SneakyThrows
    @Override
    public void run() {
        Thread threadA = new Thread(new Service(this, "A", SERVICE_A_LATENCY, user));
        Thread threadB = new Thread(new Service(this, "B", SERVICE_B_LATENCY, user));
        threadA.start();
        threadB.start();
        threadA.join();
        threadB.join();

        List<Thread> threads = new ArrayList<>();
        for (int i = 1; i <= PERSISTENCE_FORK_FACTOR; i++) {
            Thread thread = new Thread(new Persistence(i, serviceResult.get(0), serviceResult.get(1)));
            thread.start();
            threads.add(thread);
        }

        // 종료 조건 - 가장 효율적인 방법은 아니지만 의도대로 동작한다.
        for (Thread thread : threads) {
            thread.join();
        }
    }

    public synchronized void addToResult(String result) {
        serviceResult.add(result);
    }
}
// Service와 Persistence 구현 코드는 생략

Java 5에 도입된 ExecutorService

public void shouldExecuteIterationsConcurrently() throws InterruptedException {

    for (int user = 1; user <= USERS; user++) {
        executor.execute(new UserFlow(user));
    }

    // 종료 조건
    latch.await();
    executor.shutdown();
    executor.awaitTermination(60, TimeUnit.SECONDS);
}

static class UserFlow implements Runnable {

    private final int user;

    UserFlow(int user) {
        this.user = user;
    }

    @SneakyThrows
    @Override
    public void run() {
        Future<String> serviceA = executor.submit(new Service("A", SERVICE_A_LATENCY, user));
        Future<String> serviceB = executor.submit(new Service("B", SERVICE_B_LATENCY, user));

        for (int i = 1; i <= PERSISTENCE_FORK_FACTOR; i++) {
            executor.execute(new Persistence(i, serviceA.get(), serviceB.get()));
        }

        latch.countDown();
    }
}
// Service와 Persistence 구현 코드는 생략

Fork/Join 프레임 워크

public static class UserFlowRecursiveAction extends RecursiveAction {

private final List<Integer> workload;

public UserFlowRecursiveAction(List<Integer> workload) {
    this.workload = workload;
}

@Override
protected void compute() {

    if (workload.size() > 1) {
        commonPool.submit(new UserFlowRecursiveAction(workload.subList(1, workload.size())));
    }

    int user = workload.get(0);

    ForkJoinTask<String> taskA = commonPool.submit(() -> service("A", SERVICE_A_LATENCY, user));
    ForkJoinTask<String> taskB = commonPool.submit(() -> service("B", SERVICE_B_LATENCY, user));

    IntStream.rangeClosed(1, PERSISTENCE_FORK_FACTOR)
            .forEach(i -> commonPool.submit(() -> persistence(i, taskA.join(), taskB.join())));
}

}



## CompletableFuture  

* Java 8에서 도입되어 Fork/Join 프레임워크를 기반으로 만들어져 제공됨 
* 더 개선된 함수형 프로그래밍 스타일 도입
* 로직을 조립하고 결과를 모아서 처리하고 비동기 연산과정을 실행하고 에러를 처리할 수 있는 50여개의 메소드 추가

## Reactive  

* 비동기 데이터 스트림을 처리하고 에러 처리와 배압을 확고하게 지원하는 리액티브 프로그래밍만을 다룬다.  
* 데이터를 발생시키는 Observable, 데이터를 소비하는 Observer, 스레드를 관리하는 Scheduler의 삼위 일체  
* 리액티브 방식을 도입하면 프로그램 흐름 전부가 리액티브 방식으로 같이 바뀌어야 한다는 점에서 전염성이 강하다. 일부에 블로킹 코드가 남아 있으면 리액티브의 장점은 전혀 발휘되지 못한다.  
* 리액티브 구현체도 여러가지가 있다. 처음에는 RxJava가 있었지만 최근에는 스프링의 Reactor가 대세다. 액터 모델을 구현하는 Akka 프레임워크는 RxJava나 Reactor보다 더 급진적인 리액티브 프로그래밍을 적용하고 있다.  

## Project Loom  
* 정식 출시 되지는 않음
* 가상 스레드의 부활  
* JVM 스레드 : OS 스레드 = 1:1 드잇ㄱ이 더이상 성립되지 않음 
* 기존의 JVM 스레드에 비해 훨씬 가볍고 저렴하다.  
* 메타데이터, 스택메모리, 컨택스트 스위치 시간이 네이티브 OS 스레드의 수분의 일밖에 되지 않는다. 
* 아직 지원도구는 충분하지 않다. 
* JVM 차원에서의 개선이기 때문에, 많은 레거시들이 별다른 수정 없이도 성능 개선 효과를 그대로 누릴 수 있다.