kgneng2 / blokg

blog
MIT License
0 stars 0 forks source link

4. streams and state #10

Open kgneng2 opened 4 years ago

kgneng2 commented 4 years ago

This chapter covers

- Applying stateful operations to Kafka Streams
- Using state stores for lookups and remembering previously seen data
- Joining streams for added insight
- How time and timestamps drive Kafka Streams

State is nothing more than the ability to recall information you’ve seen before and connect it to current information.

4.1 Thinking of events

4.1.1 Streams need state

4.2 Applying stateful operations to Kafka streams

image

4.2.1 The transformValues processor

image

4.2.2 Stateful customer rewards

public class RewardAccumulator {

    private String customerId;
    private double purchaseTotal;
    private int currentRewardPoints;

  //details left out for clarity
}

needs

public class RewardAccumulator {

    private String customerId;
    private double purchaseTotal;
    private int currentRewardPoints;
    private int daysFromLastPurchase; //추가
    private long totalRewardPoints; // 추가

  //details left out for clarity
}

Specifically, you’ll take two main two steps:

  1. Initialize the value transformer.
  2. Map the Purchase object to a RewardAccumulator using state.

4.2.3 Initializing the value transformer

public class PurchaseRewardTransformer implements ValueTransformer<Purchase, RewardAccumulator> {

    private KeyValueStore<String, Integer> stateStore; //4.3.3 에 나옴
    private final String storeName;
    private ProcessorContext context;

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.context = context;
        stateStore = (KeyValueStore) this.context.getStateStore(storeName);
    }

4.2.4. Mapping the Purchase object to a RewardAccumulator using state

public RewardAccumulator transform(Purchase value) {
    RewardAccumulator rewardAccumulator =
 RewardAccumulator.builder(value).build();
    Integer accumulatedSoFar =
 stateStore.get(rewardAccumulator.getCustomerId()); // 1. Check for points accumulated so far by customer ID.

    if (accumulatedSoFar != null) {
         rewardAccumulator.addRewardPoints(accumulatedSoFar); // Sum the points for the current transaction and present the total.
    }
    stateStore.put(rewardAccumulator.getCustomerId(), 
                  rewardAccumulator.getTotalRewardPoints()); // Save the new total points by customer ID in the local state store.

    return rewardAccumulator;
}

Repartitioning the data

image

Repartitioning in Kafka Streams

RewardsStreamPartitioner streamPartitioner =
 new RewardsStreamPartitioner();

KStream<String, Purchase> transByCustomerStream =
 purchaseKStream.through("customer_transactions",
                               Produced.with(stringSerde,
                                             purchaseSerde,
                                             streamPartitioner));

Using a StreamPartitioner

public class RewardsStreamPartitioner implements StreamPartitioner<String, Purchase> {

    @Override
    public Integer partition(String key, Purchase value, int numPartitions) {
        return value.getCustomerId().hashCode() % numPartitions;
    }
}
> WARNING
단순히 repatitioning하는 것은 데이터 중복과 오버헤드가 발생한다.
map(), transform(), flatMap()이 자동 repartitioning이 가능하니까,
mapValues(), transformValues(), flatMapValues()는 언제든지 사용해도되지만
repartition은 논리적으론 최대한 이용안하는게 좋음. 

4.2.5. Updating the rewards processor

KStream<String, RewardAccumulator> statefulRewardAccumulator =
 transByCustomerStream.transformValues(() -> new PurchaseRewardTransformer(rewardsStateStoreName), 
        rewardsStateStoreName); // ValueTransformerSupplier
statefulRewardAccumulator.to("rewards",
                              Produced.with(stringSerde,
                                       rewardAccumulatorSerde));

key와 state는 밀접하고, store state에 대해서 알아봤음

4.3. Using state stores for lookups and previously seen data

4.3.1 Data locality

4.3.2 Failure recovery and fault tolerance

image

4.3.3. Using state stores in Kafka Streams

String rewardsStateStoreName = "rewardsPointsStore";
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName); //in-memory k/v store.

StoreBuilder<KeyValueStore<String, Integer>> storeBuilder =
 Stores.keyValueStoreBuilder(storeSupplier,
                                Serdes.String(),
                                Serdes.Integer());

builder.addStateStore(storeBuilder); //StreamBuilder

4.3.4. Additional key/value store suppliers

4.3.5. StateStore fault tolerance

4.3.6. Configuring changelog topics

4.4. Joining streams for added insight

4.4.1. Data setup

Predicate<String, Purchase> coffeePurchase = (key, purchase) ->
 purchase.getDepartment().equalsIgnoreCase("coffee");

Predicate<String, Purchase> electronicPurchase = (key, purchase) ->
 purchase.getDepartment().equalsIgnoreCase("electronics");

final int COFFEE_PURCHASE = 0;
final int ELECTRONICS_PURCHASE = 1;

KStream<String, Purchase>[] branchedTransactions =
 transactionStream.branch(coffeePurchase, electronicPurchase); //create branch

image

join하기 위해서 두개의 branch로 나눔

4.4.2. Generating keys containing customer IDs to perform joins

KStream<String, Purchase>[] branchesStream =
  transactionStream.selectKey((k,v)->
  v.getCustomerId()).branch(coffeePurchase, electronicPurchase);

kafka stream에서 새로운 키를 생성하는 method(selectKey, map, or transform)가 호출되었을때 내부 boolean flag =true로 되며, 이값은 repartitioning을 의미하는 값이고, 자동으로 된다. 위에서는 reparititoning이 자동으로 발생하게 된다.(selectKey)

4.4.3. Constructing the join

image

Joining purchase records

public class PurchaseJoiner implements ValueJoiner<Purchase, Purchase, CorrelatedPurchase> {

    @Override
    public CorrelatedPurchase apply(Purchase purchase, Purchase otherPurchase) {

        CorrelatedPurchase.Builder builder = CorrelatedPurchase.newBuilder();

        Date purchaseDate = purchase != null ? purchase.getPurchaseDate() : null;
        Double price = purchase != null ? purchase.getPrice() : 0.0;
        String itemPurchased = purchase != null ? purchase.getItemPurchased() : null;

        Date otherPurchaseDate = otherPurchase != null ? otherPurchase.getPurchaseDate() : null;
        Double otherPrice = otherPurchase != null ? otherPurchase.getPrice() : 0.0;
        String otherItemPurchased = otherPurchase != null ? otherPurchase.getItemPurchased() : null;

        List<String> purchasedItems = new ArrayList<>();

        if (itemPurchased != null) {
            purchasedItems.add(itemPurchased);
        }

        if (otherItemPurchased != null) {
            purchasedItems.add(otherItemPurchased);
        }

        String customerId = purchase != null ? purchase.getCustomerId() : null;
        String otherCustomerId = otherPurchase != null ? otherPurchase.getCustomerId() : null;

        builder.withCustomerId(customerId != null ? customerId : otherCustomerId)
                .withFirstPurchaseDate(purchaseDate)
                .withSecondPurchaseDate(otherPurchaseDate)
                .withItemsPurchased(purchasedItems)
                .withTotalAmount(price + otherPrice);

        return builder.build();
    }
}

Implementing the join

        KStream<String, Purchase> coffeeStream = branchesStream[COFFEE_PURCHASE];
        KStream<String, Purchase> electronicsStream = branchesStream[ELECTRONICS_PURCHASE];

        ValueJoiner<Purchase, Purchase, CorrelatedPurchase> purchaseJoiner = new PurchaseJoiner(); // init value joiner
        JoinWindows twentyMinuteWindow =  JoinWindows.of(60 * 1000 * 20);

        KStream<String, CorrelatedPurchase> joinedKStream = coffeeStream.join(electronicsStream,
                                                                              purchaseJoiner,
                                                                              twentyMinuteWindow,
                                                                              Joined.with(stringSerde,
                                                                                          purchaseSerde,
                                                                                          purchaseSerde)); // construct join

        joinedKStream.print(Printed.<String, CorrelatedPurchase>toSysOut().withLabel("joined KStream"));

Co-partitioning

4.4.4. Other join options

Outer Join

Left-outer join

4.5 Timestamps in Kafka Streams

image

4.5.1 Provided TimestampExtractor implementations

/* @see FailOnInvalidTimestamp
 * @see LogAndSkipOnInvalidTimestamp
 * @see UsePreviousTimeOnInvalidTimestamp
 * @see WallclockTimestampExtractor
 */
@InterfaceStability.Evolving
abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor {

    /**
     * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
     *
     * @param record a data record
     * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
     * @return the embedded metadata timestamp of the given {@link ConsumerRecord}
     */
    @Override
    public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
        final long timestamp = record.timestamp();

        if (timestamp < 0) {
            return onInvalidTimestamp(record, timestamp, previousTimestamp);
        }

        return timestamp;
    }

    /**
     * Called if no valid timestamp is embedded in the record meta data.
     *
     * @param record a data record
     * @param recordTimestamp the timestamp extractor from the record
     * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
     * @return a new timestamp for the record (if negative, record will not be processed but dropped silently)
     */
    public abstract long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
                                            final long recordTimestamp,
                                            final long previousTimestamp);
}

4.5.2. WallclockTimestampExtractor

4.5.3. Custom TimestampExtractor

4.5.4. Specifying a TimestampExtractor

public class TransactionTimestampExtractor implements TimestampExtractor {

    @Override
    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
        Purchase purchasePurchaseTransaction = (Purchase) record.value();
        return purchasePurchaseTransaction.getPurchaseDate().getTime();
    }
}

props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TransactionTimestampExtractor.class);

Summary

kgneng2 commented 4 years ago

4장 kafka stream 발표

4

여기서 state 를 되게 강조하는데 정의는 State 는 이전에 본 정보를 불러오는것과, 현재 정보를 연결하는 것에 불과하다.

4.1.1

때로 진행상황을 쉽게 알수 있지만, 좋은 결정을 하기 위해선 어떤 context추가가 필요하다. 스트림처리에서 이런 추가된 맥락을 State라고 한다.

4.2 예제

3장에서 했던 예저를 보면 마스킹하고 리워드 이런식으로 하고 한번의 보상만 이루어지고 있음

4.2.1 transformValue를 이용해서 키를 local store에 저장해서 처리한다.

4.2.2 3장에서 rewordAccumulator를 이제 변경할것입니다.

마지막 구매로 부터 일자 총 보상 포인트

두가지 스탭이 있음 일단 value transformer를 초기화 상태 매핑

4.2.3 일단 초기화

  1. 일단 PurchaseRewardTransformer(ValueTransformer) 를 만듬 ( init을 상속받아서 만들고 여기서 ProcessorContext는 스트림 프로세스 처리하는 설정값들이 있음 applicationId topic paritition taskId 등등)

  2. purchase -> RewordAccumulator

4.2.4 이제 보상 관련되서 고객의 판매 정보를 얻기 위해서는 같은 파티션에 두어야한다고 말하고 있음. 동일한 파티션에 같은 customerId로 파티션 배치하는게 중요하다. 왜냐하면 state store가 streamTask마다 할당이 되는데, 다른 파티션에 뿌려져있으면 찾기 어려우니까….. 고로 리파티션이 필요함 through() 를 이용해서 토폴로지 단순화한다.

4.2.5 이제 업데이트 하면됨.

이로서 key값은 state와 매우 밀접하고, store state에 대해서 알수 있었습니다.

4.3 state store 사용법

4.3.1 Data locality는 성능에서 매우 중요하지요…. 원격저장소일땐 당연히 느려질수 밖에없음.

4.3.2 복구

local store를 내부 토픽에 백업해둠. 각자 있기 때문에 다른건ㅅ에 영향이 없음

하지만 state store backup 비용은 크다.

4.3.3 state store 사용하기 이래저래 사용하면될꺼같고

4.4 Joining stream

전자제품 +_커피 둘다사면 커피 쿠폰을 주는걸 만들꺼임. customerId를 키로해서 만듬 조인할때 FK 로 쓸뜻…

RocksDB : DB library … 로컬에서 생성할 수있은 디비 그냥 단순히 Hbase table , sorted된 테이블 , 인덱스가 되어있고 , write 속도가 매우 빠름. MyRocksDB : Mysql 기반으로 만듬, 껍데기가 mysql, 엔진은 RocksDB. 로컬디비로 쓰기엔 매우 좋음.

intermediate topic찾기