Open han1448 opened 2 years ago
Serdes
public class TweetSerdes implements Serde<Tweet> {
@Override
public Serializer
@Override
public Deserializer
Filtering Data
Predicate<byte[], Tweet> englishTweets =
(key, tweet) -> tweet.getLang().equals("en"); 1
Predicate<byte[], Tweet> nonEnglishTweets = (key, tweet) -> !tweet.getLang().equals("en"); 2
KStream<byte[], Tweet>[] branches = filtered.branch(englishTweets, nonEnglishTweets); 1
KStream<byte[], Tweet> englishStream = branches[0]; 2
KStream<byte[], Tweet> nonEnglishStream = branches[1]; 3
- Translating Data
- map (1:1)
- mapValues (rekey - 키를 변경 하는 경우가 아니면 mapValues 가 효휼적)
- flatMap (0..1..N)
- flatMapValues
- Merging Streams
- Branching 과 반대로 N개의 stream 을 하나로 합침
- SQL의 union query 와 동일하다고 보면 됨
```java
KStream<byte[], Tweet> merged = englishStream.merge(translatedStream);
이전의 데이터를 query 할 수 있으며 aggregation 등의 작업을 할 수 있음
RocksDB 사용
StreamsConfig.STATE_DIR_CONFIG
의 기본값은 /tmp/kafka-streams 이지만 /tmp 를 사용하지 않는 것이 좋음selectKey()
KTable 은 오직 groupBy 만 지원
KTable<String, HighScores> highScores =
grouped.aggregate(
highScoresInitializer,
highScoresAdder,
Materialized.<String, HighScores, KeyValueStore<Bytes, byte[]>> 1
as("leader-boards") 2
.withKeySerde(Serdes.String()) 3
.withValueSerde(JsonSerdes.HighScores()));
QueryableStoreTypes.keyValueStore()
QueryableStoreTypes.timestampedKeyValueStore()
QueryableStoreTypes.windowStore()
QueryableStoreTypes.timestampedWindowStore()
QueryableStoreTypes.sessionStore()
ReadOnlyKeyValueStore<String, HighScores> stateStore =
streams.store(
StoreQueryParameters.fromNameAndType(
"leader-boards",
QueryableStoreTypes.keyValueStore()));
// Point lookup HighScores highScores = stateStore.get(key);
// All entries KeyValueIterator<String, HighScores> range = stateStore.all();
// Range scans KeyValueIterator<String, HighScores> range = stateStore.range(1, 7); 1
while (range.hasNext()) { KeyValue<String, HighScores> next = range.next(); 2
String key = next.key;
HighScores highScores = next.value; 3
// do something with high scores object
}
range.close(); 4
- Local Queries
- `GlobalKTable` 이 아닌 `KTable` 인 경우는 partition 된 일부의 데이터만 조회 가능
- 하지만 remote query 를 할수 있는 방법을 제공
- Remote Queries
- Full state 검색을 위해서는 아래의 조건을 만족해야 함
- 어떤 인스턴스가 어떤 데이터를 가지고 있는 discovery 가 필요 `queryMetadataForKey()`
- 외부 통신을 위한 RPC 또는 REST service 필요
- 클라이언트에도 서버와 통신을 위한 RPC 또는 REST service 필요
- kafak streams 는 이런 built-in lib 를 지원해주지 않아서 직접 lib 를 제공해줘야 함
<img width="500" alt="스크린샷 2022-05-19 오전 11 34 25" src="https://user-images.githubusercontent.com/4098287/169192069-b97f6efa-1631-4bcf-8627-e514643d181f.png">
Event time
Ingestion time
Processing time
log.message.timestamp.type (broker level) CreateTime(event)
, LogAppendTime(ingestion)
message.timestamp.type (topic level)
window mark 를 위한 timestamp 추출
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
설정을 통해 변경
FailOnInvalidTimestamp
LogAndSkipOnInvalidTimestamp
WallclockTimestampExtractor
Tumbling windows
TimeWindows tumblingWindow = TimeWindows.of(Duration.ofSeconds(5));
Hopping windows
TimeWindows hoppingWindow = TimeWindows
.of(Duration.ofSeconds(5))
.advanceBy(Duration.ofSeconds(4));
Session windows
SessionWindows sessionWindow = SessionWindows.with(Duration.ofSeconds(5));
Sliding join windows
Delayed message
TimeWindows tumblingWindow =
TimeWindows
.of(Duration.ofSeconds(60))
.grace(Duration.ofSeconds(5));
Suppression
suppress
는 window 처리에서 최종 결과만 emitSuppressed.untilWindowCloses
Suppressed.untilTimeLimit
Buffer Config
BufferConfig.maxBytes()
: 버퍼 메모리를 byte 로 설정BufferConfig.maxRecords()
: 버퍼를 key 갯수 기준으로 설정BufferConfig.unbounded()
: 버퍼를 heap 메모리를 제한없이 사용 (OOM 발생 할 수 있음)Buffer Full Strategies
shutDownWhenFull
: 메모리 다 차면 application shutdownemitEarlyWhenFull
: 메모리 다 차면 중간 계산된 결과값 emitStreamsConfig.STATE_DIR_CONFIG
설정을 통해 데이터 저장 경로 지정 .
└── dev-consumer -----> application ID
├── 0_0 -----> task ID [<sub-topology-id>_<partition>]
│ ├── .lock
│ └── pulse-counts
├── 0_1
│ ├── .lock
│ └── pulse-counts
├── 0_2
│ ├── .lock
│ └── pulse-counts
├── 0_3
│ ├── .checkpoint -----> change log topic 의 offset 정보
│ ├── .lock
│ └── pulse-counts -----> 실제 데이터 저장, materializing state 이름으로 생성
│ └── ...
├── 1_0
│ ├── ...
// set ephemeral store
// 이 경우 disk fail 시 복구불가
Materialized.as("pulse-counts").withLoggingDisabled();
Map<String, String> topicConfigs =
Collections.singletonMap("min.insync.replicas", "2");
KTable<Windowed
- 이미 생성된 changelog topic 설정 수정은 kafka cli 로 직접 토픽을 수정해야 함
## Standby Replicas
- state store 의 downtime 시간을 줄이는 방법 중 하나로 여분의 instance 를 미리 올려놓는 방법이 있음
- `NUM_STANDBY_REPLICAS_CONFIG` 로 설정 가능
## Rebalancing: Enemy of the State (Store)
- 기본적으로 changelog topic 에 데이터를 저장하다보니 데이터가 많을 경우 application 의 state 재설정에 많은 시간이 걸릴 수 있음
- consumer rebalancing 경우 consumer group 모든 application 이 재설정 됨
- rebalancing 에 관련 broeker/consumer 명칭
- `group coordinator`: consumer group 을 관리하는 broker
- `group leader`: consumer group 에서 partition 할당을 결정하는 consumer
## Preventing State Migration
- rebalancing 이 발생했을 때 대량의 state 를 복구 비용을 최소화하는 방법들
- Sticky Assignment
- `StickyTaskAssignor`
- rebalancing 발생 시 task 할당을 이전에 state 를 소유한 동일한 instance 에 할당하는 정책
- 동일한 application 에 reassign 되므로 state 를 re-initialize 할 필요가 없음
<img width="272" alt="스크린샷 2022-05-20 오전 11 58 00" src="https://user-images.githubusercontent.com/4098287/169441472-bc045053-9702-44a0-a661-855488e68cd5.png">
- Static Membership
- application 이 다양한 상황에 inactive/active 상태로 되는 경우 coordinator 는 새로운 member 로 인식하고 새로 reassign 함
- 이런 rebalance 를 최소화하기 위해 member id 를 고정하는 방법이 있음
- `group.instance.id = app-1` 하드 코딩을 해야하는 단점
- 좀 긴 session timeout 설정과 같이 사용 함
- kafka 2.3 버전 이상에서만 동작
- Eager rebalancing
- 모든 application 이 rebalance 에 참여
- 전체 프로세스 멈춤
- replayed/rebuilt 비용 발생
- Incremental Cooperative Rebalancing
- kafka 2.4 이상에서 도입된 새로운 rebalancing protocol
- rebalancing 에 참여한 consumer 의 task 를 다른 instance에 reassign
- 그 외 instance 는 reassign 에 참여하지 않음
<img width="406" alt="스크린샷 2022-05-20 오후 3 19 08" src="https://user-images.githubusercontent.com/4098287/169465201-e01b1352-dd14-4836-aee0-89a506c2c162.png">
- Tombstones
- null 로 state 를 저장하면 kafka streams 가 물리삭제를 진행
- disk 공간 절약
- Aggressive topic compaction
- topic 은 partition 으로 나누어 지고 각 partition 은 segment (broker side) 라는 작은 파일로 관리
- tombstones 또는 log compaction 같은 작업은 이 segment 사이즈가 작을 수록 효과적임
- `segment.bytes`
- `segment.ms`
- `min.cleanable.dirty.ratio`
- `max.compaction.lag.ms`
- Fixed-size LRU cache
- LRU size 만큼 state store 를 사용
- LRU 가 10개 만이라고 해도 reinitialize 시 전체 토픽의 데이터를 replay 가 필요게 단점
```java
KeyValueBytesStoreSupplier storeSupplier = Stores.lruMap("counts", 10);
Getting Started with Kafka Streams
Processor Topologies
Tasks and Stream Threads
num.stream.threads
로 어플리케이션에서 사용할 thread 수를 조절High-Level DSL Versus Low-Level Processor API
Streams(a record stream) and Tables (a changelog stream)
2 개의 데이터 구조에 따라 사용하는 topic 설정/구조가 달라짐
cleanup.policy
는 기본적으로compact
KStream, KTable, GlobalKTable
high-level DSL 에서 사용가능