Open hubtwork opened 2 weeks ago
카프카를 활용하면서 내용상으로 중요하고 완벽한 순서를 유지하면서 오랫동안 저장되어 있는 데이터 스트림 처리가 필요했다. 카프카 스트림즈를 사용하면 외부 처리 프레임워크에 의존할 필요 없이 애플리케이션에 이벤트를 읽고, 처리하고, 쓰는 기능을 구현할 수 있다.
우리가 분석하고자 하는 모든 비즈니스 활동을 나타낼 수 있음
스트림 처리는 일반적인 데이터 처리와 매우 비슷하다. 데이터를 읽고, 처리를 한 뒤, 결과물을 기록한다.
스트림 애플리케이션이 시간 윈도우(time window)에 대해 작업을 수행하는 만큼 시간에 대해 공통적인 개념을 가져야 함
카프카 스트림즈는 TimestampExtractor 인터페이스를 사용해 각 이벤트에 시간을 부여한다. 이 인터페이스의 서로 다른 구현체를 사용하여 위 3가지 시간 개념 중 하나를 사용하거나 타임스탬프를 결정하여 완전히 다른 시간 개념을 사용할 수도 있다.
[!NOTE]
카프카 스트림즈가 결과물을 카프카 토픽에 쓸 때 '이벤트에 타임스탬프를 부여하는 규칙'
- 결과 레코드가 입력으로 주어진 레코드에 직접적으로 대응될 경우: 결과 레코드는 입력 레코드와 동일한 타임스탬프를 사용
- 결과 레코드가 집계(aggregation) 연산의 결과물일 경우: 집계에 사용된 레코드 타임스탬프의 최대값을 결과 레코드의 타임스탬프로 사용
- 결과 레코드가 두 스트림을 조인한 결과일 경우: 조인된 두 레코드 타임스탬프 중 큰 쪽의 타임스탬프를 결과 레코드의 타임스탬프로 사용
- 결과 레코드가 스트림과 테이블을 조인한 경우: 스트림 레코드 쪽의 타임스탬프 사용
- 입력과 상관없이 특정한 스케줄에 따라 데이터를 생성한 결과 레코드의 경우: 스트림 처리 애플리케이션의 현재 내부 시각에 따라 결정 (ex: 카프카 스트림즈의 punctuate())
[!NOTE] Timezone 주의 전체 데이터 파이프라인이 표준화된 시간대 하나만 쓰거나, 서로 다른 시간대의 데이터 스트림을 다뤄야 한다면 윈도우에 작업을 수행하기 전에 이벤트 시각을 하나의 시간대로 변환해두어야 한다. 혹은 레코드에 시간대 정보를 저장해 넣는 경우도 있다.
※ 애플리케이션 내부 상태와 외부 상태를 일관적으로 유지해야하는 점을 고려해야 함
스트림은 변경 내역을 저장한다. 변경을 유발하는 이벤트의 연속이다. 예를 들어 재고 변경 스트림을 통해 재고 변경을 구체화할 수 있다.
스트림 작업은 시간을 윈도우라 불리는 구간 단위로 잘라서 처리한다. 두 스트림을 조인하는 작업 역시 윈도우 작업이다.
장애가 발생했을 경우에도 각각의 레코드를 한 번만 처리할 수 있어야 한다. 즉, '정확히 한 번' 보장이 없는 경우 스트림 처리는 정ㅇ확한 결과가 요구되는 상황에서 쓸모가 없다.
카프카 스트림즈는 카프카의 트랜잭션 기능을 사용해서 스트림 처리 애플리케이션에서 '정확히 한 번' 보장을 지원한다.
processing.guarantee=exactly_once
로 설정하여 '정확히 한 번'을 활성화processing.guarantee=exactly_once_beta
를 설정하여 '더 효율적인 정확히 한 번' 구현체를 활성화[!NOTE] 버전 3.0 이후의 처리 보장
processing.guarantee=exactly_once_v2
사용을 권장 (processing.guarantee=exactly_once
와processing.guarantee=exactly_once_beta
는 버전 4.0부터 제거될 예정)
가장 단순한 스트림 처리 패턴으로 이벤트를 개별적으로 처리하는 것이다. map/filter 패턴이라고도 한다.
대부분 윈도우 집계와 같이 정보의 집계에 초점을 맞춘다. 집계를 위해 스트림의 상태(state)를 유지해야 한다.
카프카 파티셔너를 통해 동일한 주식에 대한 모든 이벤트가 동일한 파티션에 쓰여지도록 처리 -> 각각의 애플리케이션 인스턴스는 자신에게 할당된 파티션에 저장된 모든 이벤트를 컨슘 -> 각 애플리케이션 인스턴스는 자신에게 할당된 파티션에 쓰여진 전체 주식 종목의 부분집합에 대한 상태 유지가 가능
전체 집계가 필요한 경우 정보가 서로 다른 인스턴스에 할당된 파티션에 분산되어있을 수 있다.
각 프로세서가 맡고 있는 파티션에 대한 결과물 산출 -> 이 결과물을 별도 토픽의 단일 파티션에 적재 -> 또 다른 프로세서가 결과물을 종합하여 최종 결과물을 산출
데이터베이스에 저장된 데이터를 사용해 검증하거나 정보를 조합해 확장할 수 있다.
이벤트 토픽 발생 -> 데이터베이스의 데이터를 통한 확장(조합) -> 확장(조합)된 새 이벤트를 다른 토픽에 적재
[!NOTE] CDC(Change Data Capture) 데이터베이스 변경 내역을 이벤트 스트림으로 받아오는 것 (카프카 커넥트의 커넥터들은 CDC를 수행하며 데이터베이스 테이블을 변경 이벤트 스트림으로 변환할 수 있음)
두 테이블을 조인하는 것은 윈도우 처리되지 않는 연산이기 때문에 작업이 실행되는 시점의 데이터 상태를 조인한다. 카프카 스트림에서는 동일한 방식으로 파티션된 동일한 키를 가지는 두 테이블에 대해 동등 조인을 수행할 수 있다.
스트림과 스트림을 조인해야할 경우가 있다. 이 조인에는 항상 이동하는 시간 윈도우가 수반되므로 스트리밍 조인을 윈도우 조인이라고도 부른다.
내용A를 담은 스트림과 내용B를 담은 스트림을 조인 -> 내용A와 B를 조합한 결과물 산출을 위해 내용A를 특정 기간을 기준으로 하되 특정 시간 윈도우 범위 안에 있는 내용B 하고 조합 -> 즉, 각 스트림에 대해 같은 길이를 가지는 윈도우를 유지하면서 각 윈도우에 속한 이벤트끼리 맞춰야 함
네트워크 등 여러 이유에 의해 잘못된 시간에 스트림에 도착한 이벤트를 처리하는 Out of sequence 이벤트는 자주 발생할 수 있다.
대응 방법
방법1
사례: 구번전에서 사용하던 이벤트 스트림을 신버전 애플리케이션에서 읽어와 산출된 새로운 결과 스트림을 쓴다. 구버전의 결과를 교체하는 것이 아닌 일정 기간 동안 두 버전의 결과를 비교한 뒤, 특정 시점에 구버전 대신 신버전 결과를 사용하도록 한다.
- 신버전을 새 컨슈머 그룹으로 실행
- 신버전이 입력 토픽의 첫 오프셋부터 처리를 시작하도록 설정하여 입력 스트림의 모든 이벤트에 대한 복사본을 적재
- 신버전이 처리를 계속하고, 신버전이 처리 작업을 따라잡았을 때 새로운 결과 스트림으로 전환
방법2
사례: 기존 스트림 처리 애플리케이션에 버그가 많아서 버그를 고친 뒤 이벤트 스트림을 재처리하여 결과를 재산출 한다.
- 이미 존재하는 애플리케이션을 초기화하여 입력 스트림의 맨 처음부터 다시 처리하도록 처리
- 로컬 상태를 초기화하고 기존 출력 스트림 내용을 삭제
카프카 스트림즈가 스트림 처리 상태를 초기화하기 위한 툴을 제공하지만, 용량이 충분한 경우 방법1이 버전 간 전환이 가능하고, 버전 간 결과물 비교가 가능하며, 정리(clean up) 과정에서 중요한 데이터가 유실되거나 에러가 발생할 위험이 낮아서 더 권장된다.
count(): 결과 산출을 위한 메서드 실행
public class WordCountExample {
public static void main(String[] args) throws Exception{
...
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("wordcount-input");
final Pattern pattern = Pattern.compile("\\W+");
KStream counts = source.flatMapValues(value-> Arrays.asList(pattern.split(value.toLowerCase())))
.map((key, value) -> new KeyValue<Object, Object>(value, value))
.filter((key, value) -> (!value.equals("the")))
.groupByKey()
.count().mapValues(value->Long.toString(value)).toStream();
counts.to("wordcount-output");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
mapValues(): 집계 결과를 사용해 최종 결과를 계싼하여 출력 스트림으로 내보냄
public class StockStatsExample {
public static void main(String[] args) throws Exception {
...
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Trade> source = builder.stream(Constants.STOCK_TOPIC);
KStream<Windowed<String>, TradeStats> stats = source
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).advanceBy(Duration.ofSeconds(1)))
.<TradeStats>aggregate(() -> new TradeStats(),
(k, v, tradestats) -> tradestats.add(v),
Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("trade-aggregates")
.withValueSerde(new TradeStatsSerde()))
.toStream()
.mapValues((trade) -> trade.computeAvgPrice());
stats.to("stockstats-output", Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class, windowSize)));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
}
}
StreamJoined.with(key, value1, value2): 조인 결과에 대한 Serde를 정의
public class ClickstreamEnrichment {
public static void main(String[] args) throws Exception {
...
StreamsBuilder builder = new StreamsBuilder();
KStream<Integer, PageView> views = builder.stream(Constants.PAGE_VIEW_TOPIC, Consumed.with(Serdes.Integer(), new PageViewSerde()));
KTable<Integer, UserProfile> profiles = builder.table(Constants.USER_PROFILE_TOPIC, Consumed.with(Serdes.Integer(), new ProfileSerde()));
KStream<Integer, Search> searches = builder.stream(Constants.SEARCH_TOPIC, Consumed.with( Serdes.Integer(), new SearchSerde()));
// 스트림-테이블 조인: 정보 확장
KStream<Integer, UserActivity> viewsWithProfile = views.leftJoin(profiles,
(page, profile) -> {
if (profile != null)
return new UserActivity(profile.getUserID(), profile.getUserName(), profile.getZipcode(), profile.getInterests(), "", page.getPage());
else
return new UserActivity(-1, "", "", null, "", page.getPage());
});
// 스트림-스트림 조인: 시간 윈도우 기준
KStream<Integer, UserActivity> userActivityKStream = viewsWithProfile.leftJoin(searches,
(userActivity, search) -> {
if (search != null)
userActivity.updateSearch(search.getSearchTerms());
else
userActivity.updateSearch("");
return userActivity;
},
JoinWindows.of(Duration.ofSeconds(1)),
StreamJoined.with(Serdes.Integer(), new UserActivitySerde(), new SearchSerde()));
userActivityKStream.to(Constants.USER_ACTIVITY_TOPIC, Produced.with(Serdes.Integer(), new UserActivitySerde()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
카프카 스트림즈 실행 단계
단위 테스트
통합 테스트
카프카 스트림즈는 하나의 애플리케이션 인스턴스 안에 다수의 스레드가 실행될 수 있게 함으로써 규모 확장과 서로 다른 애플리케이션 인스턴스 간에 부하 분산이 이루어지도록 한다.
태스크간 의존성 발생 케이스
스트림-스트림 조인
리파티셔닝
장애가 발생하기 전 마지막으로 커밋된 오프셋을 카프카에서 가져와 처리하던 스트림의 마지막으로 처리된 지점부터 처리를 재개할 수 있다. 단, 로컬 상태 저장소가 유실됐을 경우 카프카로부터 체인지로그를 읽어와 로컬 상태 저장소를 복구한다.
복구 시간을 줄이는 것은 상태를 복구시키는 시간을 줄이는 문제와 동일하다. 따라서 아래 방법들을 사용한다.
[!NOTE] Kafka Streams - State restoration during workload rebalance
각자 읽어보자.
진짜 재밌게 읽었다. 여러 장점들과 기능을 나열해두어 활용할 사례들이 떠올랐는데, 마지막에 고려해야할 사항이 있어서 다시 한 번 검토해볼 수 있도록 해준 점이 인상 깊었다. 근데 결국 카프카 스트림즈 짱! 이라고 하는 것 같았다.
스트림 처리와 사용사례에 대해 알게 되서 좋은 챕터였다. 꼭 한번에 해내려고 하지 않아도 된다는 것, 큰 문제를 작게 나눠 작은 문제로 만들듯 하나의 페이즈 별로 처리하는 방법들을 고민해보려고 한다. 스트림이라는 개념에서 사실 경계해야 하는 부분은 손실이다. 킹치만 카프카는 다 해준다고 ㄹㅇㅋㅋ 다들 고생했습니다.
지금까지 공부했던 카프카 내용들도 매우 좋았는데, 알고보니 지금까지 공부했던 것들은 기본편이고 카프카 스트림이 고급 심화편이라고 느껴졌다. 카프카를 활용해서 시스템을 확장시키고 이렇게까지 할 수 있다고 보여주는 것 같았다. 카프카 스트림이 아니더라도 엔지니어로서 가져야 할 시각과 방향성도 보여주고 있어 유익했지만, 아직 나에게 고급 심화편은 어렵긴 하다.
이번 랭킹 시스템의 컨슈머 파이프라이닝 설계할 때, 카프카 스트림즈로 제어하면 좀 더 AGG 가 쉽지 않을까? 하고 고민했었는데 ( 물리적 시간이 없어 포기 ) 주식 예제랑 클릭 예제가 딱 적절한 처리를 할 수 있음을 보여주는 예시여서 좋았다. 하지만, 아직까지는 경험치 자체가 적어서 섣불리 시도는 해보기 힘들 것 같고 ( 기존 시스템 설계로 throughput 이 떨어지지도 않는다 ) 추후에 쓸 일이 있으면 poc 로 한번 진행해볼 생각이다.
카프카. 고생했습니다.
이번장은 카프카를 사용할때 무조건 참고해서 볼 만한 내용들이 많았다. 특히 그중 스트림처리에 대해서는 실제 우리가 내부 어플리케이션에서 데이터를 가공하는거와 많이 유사했고 비순차 이벤트, 재처리, 장애 관련해서도 카프카를 사용할때 유심히 봐야한다는걸 느꼈다!
결론 : 결국 카프카가 또 짱이라는 소리를 하는 느낌을 받음ㅋㅋㅋㅋ
카프카 스트림이 데이터를 단순히 소비하는 것을 넘어 데이터의 상태를 관리하고, 여러 스트림을 조합하거나 윈도우 연산을 통해 데이터를 집계할 수 있다는 점이 흥미로웠다. 언제 써볼 수 있을까 싶기는 한데, 언젠가는 경험할 수 있지 않을까 하면서 지금은 그냥 이런게 있구나 정도로 읽고 넘겨야겠다.
다들 고생하셨습니다.
Chapter Ownership : @iamzin