Open hojun-lee opened 3 years ago
Reactive Stream API의 구성요소는 아래와 같다.
Publisher는 무한한 데이터를 제공한다. 제공된 data는 Subscriber가 구독하는 형식으로 처리된다.
Publisher.subscribe(Subscriber)
의 형식으로 data 제공자와 구독자가 연결을 맺게 된다.onSubscribe -> onNext -> (onError | onComplete)?
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
onNext, onError, onComplete
가 있다.public interface Subscription {
public void request(long n);
public void cancel();
}
리액터는 JVM 위에서 동작하는, 완전한 논블로킹 리액티브 프로그래밍을 위한 기반 라이브러리로, backpressure를 관리한다.
리액터는 자바 8의 함수형 API ( CompletableFuuture
, Stream
, Duration
)을 직접 통합한다.
Flux
, Mono
)를 구현한다.리액터를 사용하면 reactor-netty
프로젝트 프로세스와 논블로킹 방식으로 통신할 수 있다.
Reactor 3는 BOM 모델을 지원한다.
1 .마이크로소프트가 닷넷(.NET) 생태계에 만든 Reactive Extension(Rx) 라이브러리가 반응형 프로그래밍의 출발점이었다. 이후 RxJava는 JVM 위에서 실행하는 리액티브 프로그래밍을 구현했다. 시간이 지남에 따라, 리액티브 스트림 표준화의 일환으로 JVM 위에서 동작하는 리액티브 라이브러리의 인터페이스 셋과 상호작용 규칙을 정의한 자바 표준이 등장했다. 이 인터페이스들은 자바 9의 Flow 클래스
로 통합됐다.
리액티브 스트림을 구현한 모든 라이브러리는 Iterable-Iterator
쌍과 성격이 유사하다. 주요 차이점 중 하나는 이터레이터는 pull
기반, 리액티브 스트림은 push
기반이라는 것이다.
Iterable
책임이다. (명령형 프로그래밍 패턴). 데이터 시퀀스에서 next()
아이템에 접근하는 것은 개발자에 달려 있다.Publisher-Subscriber
쌍이 이를 대신한다. 단, 새로운 데이터가 있음을 Publisher가 Subscriber에게 통지하며, 이런 push
방식이 리액티브의 핵심이다. 또한, push 받은 데이터에 적용할 연산은 명령형이 아닌 선언형으로 표현한다: 프로그래머는 정확한 제어 흐름을 작성하는 대신 계산 논리를 표현한다.리액티브 스트림은 데이터를 push하는 것 외에 에러 처리와 완료 처리도 잘 정의하고 있다. Publisher는 Subscriber에 새 값을 푸쉬할 수 있을 뿐 아니라(onNext
호출함으로써), 에러(onError
호출)나 완료(onComplete
호출) 신호를 보낼 수도 있다. 에러, 완료 신호 모두 시퀀스를 종료한다. 이는 다음과 같이 요약된다:
onNext x 0..N [onError | onComplete]
- 이 접근법은 굉장히 유연하다. 값이 없거나, 하나거나, n개일 때를(연속적인 시간 값 같은 무한 시퀀스도 포함) 모두 커버한다.
프로그램의 성능을 끌어 올리는 방법은 크게 두 가지가 있다.
자바 개발자는 보통 블로킹 코드로 프로그램을 작성한다. 성능에 병목이 생기지만 않는다면 이 방법도 괜찮다. 이때까진 유사한 블로킹 코드를 실행할 스레드를 늘리면 된다. 하지만 이 방식은 리소스를 더 사용하는 쪽으로 확장하기 때문에 경합이나 동시성 이슈
가 발생하기 쉽다.
callback
파라미터를 (람다나 익명 클래스) 추가로 받는 비동기 메소드Future<T>
를 반환하는 비동기 메소드, 비동기 프로세스는 T
값을 계산하고, 이를 래핑한 Future
객체로 접근한다. 이 값은 즉시 사용할 수는 없고, 사용이 가능해질 때까지 객체를 폴링할 수 있다. 예를 들어, ExecutorService
는 Callable<T>
태스크를 실행할 때 Future
객체를 사용한다.하지만 위의 두 가지 모두 다 제약이 있다.
Callback은 조합하기가 까다롭고, 콜백 지옥이 만들어질 수 있다.
userService.getFavorites(userId, new Callback<List<String>>() { // (1)
public void onSuccess(List<String> list) { // (2)
if (list.isEmpty()) { // (3)
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) { // (4)
UiUtils.submitOnUiThread(() -> { // (5)
list.stream()
.limit(5)
.forEach(uiList::show); // (6)
});
}
public void onError(Throwable error) { // (7)
UiUtils.errorPopup(error);
}
});
} else {
list.stream() // (8)
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId, // (9)
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
이 코드는 양이 많아 따라가기가 어렵고, 반복되는 부분도 많다. 이 코드를 리액터로 작성하면 아래와 같다.
userService.getFavorites(userId) // (1)
.flatMap(favoriteService::getDetails) // (2)
.switchIfEmpty(suggestionService.getSuggestions()) // (3)
.take(5) // (4)
.publishOn(UiUtils.uiThreadScheduler()) // (5)
.subscribe(uiList::show, UiUtils::errorPopup); // (6)
만약 즐겨 찾기 ID를 조회하는 시간을 800ms 미만으로 제한하고, 그 이상은 캐시에서 조회하려면 어떻게 해야 할까? 콜백 기반 코드에서는 꽤 복잡한 작업이지만, 리액터라면 문제가 쉬워진다.
userService.getFavorites(userId)
.timeout(Duration.ofMillis(800)) // (1)
.onErrorResume(cacheService.cachedFavoritesFor(userId)) // (2)
.flatMap(favoriteService::getDetails) // (3)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
Future
객체는 콜백보다 여러 면으로 낫고 자바 8 CompletableFuture
로 좀 더 개선되기도 했지만, 조합해서 쓰긴 여전히 어렵다. Future
는 다른 문제도 있다.
Future
객체는 get()
메소드를 호출하면 결국 블로킹된다.예제를 살펴본다. 이름과 통계정보를 쌍으로 조회하고 싶은 데이터의 ID 리스트를 가져오고, 모든 동작을 비동기로 처리한다고 생각해보자.
Example of CompletableFuture
combination
CompletableFuture<List<String>> ids = ifhIds(); // (1)
CompletableFuture<List
return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); // (6)
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); // (7)
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); // (8)
return allDone.thenApply(v -> combinationList.stream()
.map(CompletableFuture::join) // (9)
.collect(Collectors.toList()));
});
List
* 나의 Example
```java
CompletableFuture<List<String>> ids =
CompletableFuture.supplyAsync(() -> {
ArrayList<String> fruits = new ArrayList<String>();
fruits.add("apple");
fruits.add("banana");
fruits.add("grape");
fruits.add("lemon");
fruits.add("watermelon");
return fruits;
});
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
Stream<CompletableFuture<String>> zip =
l.stream().map(element -> {
CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> element + " 의 가격은");
CompletableFuture<Integer> c2 = CompletableFuture.supplyAsync(() -> new Integer((int) (Math.random() * 1000 + 1)));
return c1.thenCombineAsync(c2, (s1, s2) -> s1 + " " + s2);
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
return allDone.thenApply(v -> combinationList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
});
List<String> result2 = result.join();
for(int i = 0; i < result2.size(); i++) {
log(result2.get(i));
}
23:03:23.576885 (main) apple 의 가격은 97
23:03:23.623924 (main) banana 의 가격은 333
23:03:23.624096 (main) grape 의 가격은 287
23:03:23.624216 (main) lemon 의 가격은 71
23:03:23.624339 (main) watermelon 의 가격은 6
궁금증 : join()
과 get()
의 차이는?
Example of Reactor code
Flux<String> ids = ifhrIds(); // (1)
Flux
return nameTask.zipWith(statTask, // (5)
(name, stat) -> "Name " + name + " has stats " + stat);
});
Mono<List
List
### 알아두어야 할 Java 8 함수
1. **CompletableFuture** ?
* **supplyAsync(), runAsync()**
* 이 두 개의 메서드를 제공하여 직접 쓰레드를 생성하지 않고 작업을 async 하도록 처리할 수 있다.
* supplyAsync() 는 ``Suppiler``, runAsync()는 ``Runnable`` 을 넘길 수 있다.
```java
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
try {
log("언제?");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("리턴 끝났니?");
return "Hi";
});
Thread.sleep(5000);
log(completableFuture.get());
22:28:32.128191 (ForkJoinPool.commonPool-worker-3) 언제?
22:28:33.181045 (ForkJoinPool.commonPool-worker-3) 리턴 끝났니?
22:28:37.114568 (main) Hi
runAsync()
도 사용방법은 동일한데, supplyAsync()
와는 다르게 리턴 값이 없다.
CompletableFuture<Void>
로 선언해야 한다.get()
은 blocking 되지만 null을 리턴한다.CompletableFuture<Void> future
= CompletableFuture.runAsync(() -> log("future example"));
log("get(): " + future.get());
supplyAsync()
으로 어떤 작업이 처리되면, 그 결과를 가지고 다른 작업도 수행하도록 구현할 수 있다.thenApply()
메서드는 인자와 리턴 값이 있는 Lambda를 수행한다. 여기서 인자는 supplyAsync()
에서 리턴되는 값이다.CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
try {
log("DEBUG01");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("DEBUG02");
return "Hi";
}).thenApply(e -> {
try {
log("DEBUG03");
Thread.sleep(1000);
} catch(Exception e2) {}
log("DEBUG04");
return "너는 무엇을 리턴할래? " + e;
});
Thread.sleep(5000);
log(completableFuture.get());
22:30:38.878453 (ForkJoinPool.commonPool-worker-3) DEBUG01
22:30:39.931372 (ForkJoinPool.commonPool-worker-3) DEBUG02
22:30:39.931924 (ForkJoinPool.commonPool-worker-3) DEBUG03
22:30:40.933847 (ForkJoinPool.commonPool-worker-3) DEBUG04
22:30:43.847623 (main) 너는 무엇을 리턴할래? Hi
thenApply()
또한 리턴 값이 있기 때문에, 연달아 thenApply()
를 적용할 수 있다.
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(s -> s + " Future");
log("future.get(): " + future.get());
```console
203 (main) future2.get(): Future1 + Future2
thenAccept() : 리턴 값이 없는 작업의 수행
thenAccept()
도 thenApply()
와 비슷하다. 하지만, 인자는 있지만 리턴값이 없는 Lambda를 처리할 수 있다.리턴 값이 없기 때문에 thenAccept()
는 CompletableFuture<Void>
를 리턴하게 된다.
CompletableFuture<Void> completableFuture
= CompletableFuture.supplyAsync(() -> {
try {
log("DEBUG01");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("DEBUG02");
return "Hi";
}).thenApply(e -> {
try {
log("DEBUG03");
Thread.sleep(1000);
} catch(Exception e2) {}
log("DEBUG04");
return "너는 무엇을 리턴할래? " + e;
}).thenAccept(e -> {
log("끝!");
});
Thread.sleep(5000);
log("result : " + completableFuture.get());
22:32:32.210233 (ForkJoinPool.commonPool-worker-3) DEBUG01
22:32:33.245620 (ForkJoinPool.commonPool-worker-3) DEBUG02
22:32:33.245980 (ForkJoinPool.commonPool-worker-3) DEBUG03
22:32:34.246671 (ForkJoinPool.commonPool-worker-3) DEBUG04
22:32:34.256238 (ForkJoinPool.commonPool-worker-3) 끝!
22:32:37.199061 (main) result : null
thenCompose() : 여러 작업을 순차적으로 진행
thenCompose()
는 chain처럼 두 개의 CompletableFuture를 하나의 CompletableFuture로 만들어주는 역할을 한다.
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
try {
log("DEBUG01");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("DEBUG02");
return "Hi";
}).thenCompose(s -> {
return CompletableFuture.supplyAsync(() -> s + " thenCompose?");
});
Thread.sleep(5000);
log("result : " + completableFuture.get());
22:35:28.799721 (ForkJoinPool.commonPool-worker-3) DEBUG01
22:35:29.836719 (ForkJoinPool.commonPool-worker-3) DEBUG02
22:35:33.793638 (main) result : Hi thenCompose?
thenCombine() : 여러 작업을 동시에 수행
thenComose()
가 여러 개의 CompletableFuture를 순차적으로 처리되도록 만들었다면, thenCombine()
은 여러 CompletableFuture를 병렬로 처리되도록 만든다. 모든 처리가 완료되고 그 결과를 하나로 합칠 수 있다.
CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> "Hello1")
.thenApply(s -> {
log("DEBUG01");
try {
Thread.sleep(3000);
} catch (Exception e) {
}
return "haha " + s;
});
CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> "Hello2")
.thenApply(s -> {
log("DEBUG02");
try {
Thread.sleep(1000);
} catch (Exception e) {
}
return "haha " + s;
});
c1.thenCombine(c2, (s1, s2) -> s1 + ", " + s2)
.thenAccept((s) -> log(s));
22:40:25.844595 (main) DEBUG01
22:40:28.915416 (main) DEBUG02
22:40:29.933201 (main) haha Hello1, haha Hello2
결과를 보면 DEBUG01이 나온 후 3초 뒤에 DEBUG02로 나오는 것으로 순차적으로 나온다. 이는 동일한 쓰레드를 사용하기 때문에 대기하는 시간이 있었다.
thenApply() vs thenApplyAsync()
thenApply()
대신 thenApplyAsync()
를 사용하면 다른 쓰레드에서 동작하도록 만들 수 있다.
CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> "Hello1")
.thenApplyAsync(s -> {
try {
Thread.sleep(3000);
} catch (Exception e) {
}
log("DEBUG01");
return "haha " + s;
});
CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> "Hello2")
.thenApplyAsync(s -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
log("DEBUG02");
return "haha " + s;
});
c1.thenCombine(c2, (s1, s2) -> s1 + ", " + s2)
.thenAccept((s) -> log(s));
Thread.sleep(5000);
22:43:35.671769 (ForkJoinPool.commonPool-worker-5) DEBUG02
22:43:37.658641 (ForkJoinPool.commonPool-worker-3) DEBUG01
22:43:37.673139 (ForkJoinPool.commonPool-worker-3) haha Hello1, haha Hello2
allOf() (살짝 이해 안됨)
allOf()
는 모든 future의 결과를 받아서 처리할 수 있다.anyOf()
와 다르게 Stream api를 사용하여 결과를 처리할 수 있다. get()
은 null을 리턴한다.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "future1");
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "future2");
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(() -> "future3");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
log("get() : " + combinedFuture.get());
log("future1.isDone() : " + future1.isDone());
log("future2.isDone() : " + future2.isDone());
log("future3.isDone() : " + future3.isDone());
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" + "));
log("Combined: " + combined);
22:47:30.790517 (main) future1.isDone() : true
22:47:30.843937 (main) future2.isDone() : true
22:47:30.844302 (main) future3.isDone() : true
22:47:30.852526 (main) Combined: future1 + future2 + future3
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
<U> Optional<U> map(Function<? super T, ? extends U> mapper)
<U> Optional<U> flatMap(Function<? super T, Optional<U>> mapper);
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
String[][] data = new String[][]{ {"1", "2"}, {"3", "4"} };
// Arrays.stream(data) 가 Stream<Stream<String>> 형태 일 것임
Stream<Stream<String>> map = Arrays.stream(data).map(x -> Arrays.stream(x));
map.forEach(s -> s.forEach(System.out::println));
Stream<String> flatmap = Arrays.stream(data).flatMap(x -> Arrays.stream(x));
flatmap.forEach(System.out::println);
Function의 두 번째 파라미터가 Optional이나 Stream으로 map 과 다름을 알 수 있다.
.flatMap()을 활용한 2차원 배열의 원소 중 a를 찾는 코드
String[][] sample = new String[][]{
{"a", "b"}, {"c", "d"}, {"e", "a"}, {"a", "h"}, {"i", "j"}
};
//without .flatMap()
Stream
```console
//output
{a, b}
{e, a}
{a, h}
String[][] sample = new String[][]{
{"a", "b"}, {"c", "d"}, {"e", "a"}, {"a", "h"}, {"i", "j"}
};
//without .flatMap()
Stream<String> stream = sample.stream()
.flatMap(array -> Arrays.stream(array))
.filter(x-> "a".equals(x));
stream.forEach(System.out::println);
Publisher
체인을 작성한다고 해서 데이터를 바로 공급하진 않는다. 구독
을 해야 Publisher
와 Subscriber
가 연결되고, 전체 체인에 데이터 흐름이 트리거 된다.Flux.range(1, Integer.MAX_VALUE)
.log()
.concatMap(x -> Mono.delay(Duration.ofMillis(100)), 1) // simulate that processing takes time
.blockLast();
onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
request(1)
onNext(1)
request(1)
onNext(2)
request(1)
onNext(3)
request(1)
onNext(4)
구독 시점
부터 데이터를 새로 생성하는 Cold sequence
와 구독하는 customer와 상관 없이 데이터를 생성하는 hot sequence
가 존재한다.Flux.just()
로 생성한 시퀀스가 콜드 시퀀스이다.
Flux<Integer> seq = Flux.just(1, 2, 3);
seq.subscribe(v -> System.out.println(“첫번 째 요청: " + v)); // 구독
seq.subscribe(v -> System.out.println("두번 째 요청: " + v)); // 구독
첫번째 요청 : 1
첫번째 요청 : 2
첫번째 요청 : 3
두번째 요청 : 1
두번째 요청 : 2
두번째 요청 : 3
Hot Sequence
는 구독여부에 상관없이 데이터가 생성된다. 구독을 하면 구독한 시점 이후에 발생하는 데이터부터 신호를 받는다.
Publisher
를 구현하고 있는 리액티브 타입을 (Flux
, Mono
) 다양하게 구성할 수 있으며 풍부한 연산자를 함께 제공한다.Flux<T>
는 0개부터 N개까지의 아이템을 생산하는 비동기 시퀀스를 나타내는 표준 Publsiher<T>
로, 완료나 에러 신호로 종료된다.onNext
, onComplete
, onError
메서드 호출로 이어진다.
Flux.just(1, 2, 3, 4)
.log()
.subscribe();
| onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
| request(unbounded)
| onNext(1)
| onNext(2)
| onNext(3)
| onNext(4)
| onComplete()
Mono<T>
는 최대 1개 아이템 생산에 특화된 Publisher로 onComplete
혹은 onError
신홀 종료된다.Flux
와 Mono
를 생성하는 가장 쉬운 방법은 팩토리 메소드 중 하나로 각 클래스를 생성하는 것이다.String
의 시퀀스는 아래처럼 단순히 나열하여 생성 가능
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List
Mono
* 팩토레 메소드는 값이 없어도 제네릭 타입이 필요하다는 점에 주의
3. Flux와 Mono를 구독할 때는 자바 8 람다를 사용하고, ``.subscribe()`` 메소드는 여러 가지 콜백을 조합할 수 있다.
```java
subscribe(); // (1)
subscribe(Consumer<? super T> consumer); // (2)
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer); // (3)
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer); // (4)
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer); // (5)
Flux<Integer> ints = Flux.range(1, 3);
ints.subscribe();
Flux<Integer> ints2 = Flux.range(1, 3);
ints2.subscribe(System.out::println);
Flux<Integer> ints3 = Flux.range(1, 4)
.map(i -> {
if(i <= 3) return i;
throw new RuntimeException("Go to 4");
});
ints3.subscribe(i -> System.out.println(i), // (5)
error -> System.err.println("Error: " + error));
1
2
3
1
2
3
Error: java.lang.RuntimeException: Go to 4
Flux<Integer> ints = Flux.range(1, 4); // (1)
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> System.out.println("Done")); // (2)
Flux.range(1, 6)
.log()
.map(i -> {
log("map: " + i + " + 10");
try {
Thread.sleep(5000);
}catch(Exception e) {}
return i + 10;
})
.subscribe(e -> log("e"));
log("hi");
14:30:33.448990 (main) map: 1 + 10
14:30:38.455154 (main) e
2021-03-10 14:30:38.455 INFO 20134 --- [ main] reactor.Flux.Range.1 : | onNext(2)
14:30:38.456016 (main) map: 2 + 10
14:30:43.460976 (main) e
2021-03-10 14:30:43.461 INFO 20134 --- [ main] reactor.Flux.Range.1 : | onNext(3)
14:30:43.461581 (main) map: 3 + 10
14:30:48.464945 (main) e
2021-03-10 14:30:48.465 INFO 20134 --- [ main] reactor.Flux.Range.1 : | onNext(4)
14:30:48.465640 (main) map: 4 + 10
14:30:53.467300 (main) e
2021-03-10 14:30:53.467 INFO 20134 --- [ main] reactor.Flux.Range.1 : | onNext(5)
14:30:53.468461 (main) map: 5 + 10
14:30:58.473317 (main) e
2021-03-10 14:30:58.473 INFO 20134 --- [ main] reactor.Flux.Range.1 : | onNext(6)
14:30:58.473830 (main) map: 6 + 10
14:31:03.476757 (main) e
2021-03-10 14:31:03.477 INFO 20134 --- [ main] reactor.Flux.Range.1 : | onComplete()
14:31:03.477850 (main) hi
Flux.range(1, 6)
.log()
.map(i -> {
log("map: " + i + " + 10");
try {
Thread.sleep(5000);
}catch(Exception e) {}
return i + 10;
})
.subscribeOn(Schedulers.newElastic("SUB"))
.subscribe(e -> log("e"));
log("hi");
14:32:43.902117 (SUB-2) map: 1 + 10
14:32:43.898682 (main) hi
14:32:48.907498 (SUB-2) e
2021-03-10 14:32:48.907 INFO 20463 --- [ SUB-2] reactor.Flux.Range.1 : | onNext(2)
14:32:48.908243 (SUB-2) map: 2 + 10
14:32:53.908494 (SUB-2) e
2021-03-10 14:32:53.908 INFO 20463 --- [ SUB-2] reactor.Flux.Range.1 : | onNext(3)
14:32:53.909254 (SUB-2) map: 3 + 10
14:32:58.909905 (SUB-2) e
2021-03-10 14:32:58.910 INFO 20463 --- [ SUB-2] reactor.Flux.Range.1 : | onNext(4)
14:32:58.910561 (SUB-2) map: 4 + 10
Flux.range(1, 6)
.log()
.map(i -> {
log("map: " + i + " + 10");
try {
Thread.sleep(5000);
}catch(Exception e) {}
return i + 10;
})
.subscribeOn(Schedulers.newElastic("SUB"))
.blockLast();
log("hi");
2021-03-10 14:33:42.185 INFO 20522 --- [ SUB-2] reactor.Flux.Range.1 : | onNext(1)
14:33:42.189677 (SUB-2) map: 1 + 10
2021-03-10 14:33:47.196 INFO 20522 --- [ SUB-2] reactor.Flux.Range.1 : | onNext(2)
14:33:47.196895 (SUB-2) map: 2 + 10
2021-03-10 14:33:52.197 INFO 20522 --- [ SUB-2] reactor.Flux.Range.1 : | onNext(3)
14:33:52.197841 (SUB-2) map: 3 + 10
2021-03-10 14:33:57.198 INFO 20522 --- [ SUB-2] reactor.Flux.Range.1 : | onNext(4)
14:33:57.198830 (SUB-2) map: 4 + 10
2021-03-10 14:34:02.198 INFO 20522 --- [ SUB-2] reactor.Flux.Range.1 : | onNext(5)
14:34:02.199265 (SUB-2) map: 5 + 10
2021-03-10 14:34:07.203 INFO 20522 --- [ SUB-2] reactor.Flux.Range.1 : | onNext(6)
14:34:07.204528 (SUB-2) map: 6 + 10
2021-03-10 14:34:12.210 INFO 20522 --- [ SUB-2] reactor.Flux.Range.1 : | onComplete()
14:34:12.211557 (main) hi
Disposable
을 사용하여 subscribe()
를 취소subscribe()
메소드는 모두 Disposable
타입을 리턴하고, 여기서 이 인터페이스를 이용해 dispose()
메소드 호출로 구독을 취소할 수 있다.Flux
, Mono
관점에서 취소는 소스가 데이터 생산을 중단한다는 신호다. 하지만 즉각적인 중단을 보장하지는 않는다. 데이터 소스가 취소 명령을 받기 전에 데이터를 생산하고 완료 처리할 수도 있다.스프링 웹플럭스 탄생 배경
논블로킹 웹 스택
"Reactive"
논블로킹
역시 작업을 기다린다기보단 완료되거나 데이터를 사용할 수 있게 되면 반응하므로 "리액티브"의 일종backpressure
이다.2-1. Reactive API
Mono
와 Flux
API 타입을 제공한다.2-2. Programming Models
2-3. Functional Endpoints
HandlerFunction
이 HTTP 요청을 처리한다. HandlerFunction
은 ServerRequest
를 받아 비동기 ServerResponse
(i.e. Mono<ServerResponse>
)를 리턴하는 함수다. HandlerFunction
역할은 어노테이션 프로그래밍 모델로 치면 @RequestMapping
메소드가 하던 일과 동일하다.RouterFunctions.route()
가 제공하는 빌더를 사용할 수 있다.
PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);
RouterFunction
public class PersonHandler {
// ...
public Mono<ServerResponse> listPeople(ServerRequest request) {
// ...
}
public Mono<ServerResponse> createPerson(ServerRequest request) {
// ...
}
public Mono<ServerResponse> getPerson(ServerRequest request) {
// ...
}
}
2-4. Spring MVC vs Webflux?
![image](https://user-images.githubusercontent.com/78422766/110593093-7c0e1880-81be-11eb-8101-817450cf989e.png)
* Spring MVC 어플리케이션에서 외부 서비스를 호출한다면 한번 리액티브 ``WebClient`` 만 사용해 볼 수 있다.
* 논블로킹, 함수형, 선언적 프로그래밍은 러닝커브가 높은 걸 고려하면, 한번에 전환하지 않고 리액티브 ``WebClient``부터 적용해보는 것도 좋은 방법이다.
2-5. Servers
* 스프링 웹플럭스는 톰캣, Jetty, 서블릿 3.1+ 컨테이너에서도, 서블릿 기반이 아닌 Netty나 Undertow에서도 잘 동작한다.
* 스프링 웹플럭스엔 서버 기동이나 중단을 위한 내장 기능이 없고, 스프링 설정과 웹플럭스 구조를 조립해 적은 코드로 손쉽게 어플리케이션을 실행할 수 있다.
* 스프링 부트에선 웹플럭스 스타터가 이 단계를 자동화해준다. 스타터는 기본으로 Netty를 사용하지만, 메이븐 혹은 그라들에서 Jetty, tomcat, undertow로 교체할 수 있다.
* 부트가 Netty를 디폴트로 사용하는 이유는 보통 비동기 논블로킹에 많이 사용하기도 하고, 클라이언트와 서버가 리소스를 공유 할 수 있어서다.
한 컴포넌트가 부하를 이겨내기 힘들 때, 시스템 전체가 합리적인 방법으로 대응해야 한다. 과부하 상태의 컴포넌트에서 치명적인 장애가 발생하거나 제어 없이 메시지를 유실해서는 안 된다. 컴포넌트가 대처할 수 없고 장애가 발생해선 안 되기 때문에 컴포넌트는 상류 컴포넌트들에 자신이 과부하 상태라는 것을 알려 부하를 줄이도록 해야 한다. 이러한 배압은 시스템이 부하로 인해 무너지지 않고 정상적으로 응답할 수 있게 하는 중요한 피드백 방법이다. 배압은 사용자에게까지 전달되어 응답성이 떨어질 수 있지만, 이 메커니즘은 부하에 대한 시스템의 복원력을 보장하고 시스템 자체가 부하를 분산할 다른 자원을 제공할 수 있는지 정보를 제공할 것이다.
Reactive Streams API 구성요소 및 API 명세서이다.
Publisher API 명세서
Subscriber API 명세서
Subscription API 명세서
개요
내용
1. Reactive Streams 복습
2. Reactor(Flux, Mono) 복습
3. Spring WebFlux + Mustache 기반 김치프리미엄 사이트 만들기
4. Docker Image 만들기
5. AWS ec2