kgneng2 / blokg

blog
MIT License
0 stars 0 forks source link

6. Processor API #11

Open kgneng2 opened 4 years ago

kgneng2 commented 4 years ago
This chapter covers
- Evaluating higher-level abstractions versus more control
- Working with sources, processors, and sinks to create a topology
- Digging deeper into the Processor API with a stock analysis processor
- Creating a co-grouping processor
- Integrating the Processor API and the Kafka Streams API

6.1. The trade-offs of higher-level abstractions vs. more control

6.2. Working with sources, processors, and sinks to create a topology

6.2.1. Adding a source node

topology.addSource(LATEST,
                  purchaseSourceNodeName,
                  new UsePreviousTimeOnInvalidTimestamp(),
                  stringDeserializer,
                  beerPurchaseDeserializer,
                  Topics.POPS_HOPS_PURCHASES.topicName())

6.2.2. Adding a processor node

BeerPurchaseProcessor beerProcessor =
 new BeerPurchaseProcessor(domesticSalesSink, internationalSalesSink);

topology.addSource(LATEST,
                  purchaseSourceNodeName,
                  new UsePreviousTimeOnInvalidTimestamp(),
                  stringDeserializer,
                  beerPurchaseDeserializer,
                  Topics.POPS_HOPS_PURCHASES.topicName())
       .addProcessor(purchaseProcessor,
                     () -> beerProcessor,
                     purchaseSourceNodeName); // 부모 자식 노드관계 설립
public class BeerPurchaseProcessor extends AbstractProcessor<String, BeerPurchase> {
    private String domesticSalesNode;
    private String internationalSalesNode;

    public BeerPurchaseProcessor(String domesticSalesNode,
                                 String internationalSalesNode) {
        this.domesticSalesNode = domesticSalesNode;
        this.internationalSalesNode = internationalSalesNode;
    }

    @Override
    public void process(String key, BeerPurchase beerPurchase) {
        Currency transactionCurrency = beerPurchase.getCurrency();

        if (transactionCurrency != DOLLARS) {
            BeerPurchase dollarBeerPurchase;
            BeerPurchase.Builder builder =
 BeerPurchase.newBuilder(beerPurchase);
            double internationalSaleAmount = beerPurchase.getTotalSale();
            String pattern = "###.##";
            DecimalFormat decimalFormat = new DecimalFormat(pattern);
            builder.currency(DOLLARS);
            builder.totalSale(Double.parseDouble(decimalFormat.format(transactionCurrency
 .convertToDollars(internationalSaleAmount))));
            dollarBeerPurchase = builder.build();
            context().forward(key,
 dollarBeerPurchase, internationalSalesNode);
        } else {
            context().forward(key, beerPurchase, domesticSalesNode);
        }
    }
}

6.2.3 Adding a sink node

topology.addSource(LATEST,
                  purchaseSourceNodeName,
                  new UsePreviousTimeOnInvalidTimestamp(),
                  stringDeserializer,
                  beerPurchaseDeserializer,
                  Topics.POPS_HOPS_PURCHASES.topicName())
       .addProcessor(purchaseProcessor,
                     () -> beerProcessor,
                     purchaseSourceNodeName)

       .addSink(internationalSalesSink,
                "international-sales",
                stringSerializer,
                beerPurchaseSerializer,
                purchaseProcessor) //parent 가 같음

       .addSink(domesticSalesSink,
                "domestic-sales",
                stringSerializer,
                beerPurchaseSerializer,
                purchaseProcessor); //parent 가 같음

6.3. Digging deeper into the Processor API with a stock analysis processor

decision tree

image

6.3.1. The stock-performance processor application

Topology topology = new Topology();
 String stocksStateStore = "stock-performance-store";  //state store
 double differentialThreshold = 0.02;

KeyValueBytesStoreSupplier storeSupplier =
 Stores.inMemoryKeyValueStore(stocksStateStore);
StoreBuilder<KeyValueStore<String, StockPerformance>> storeBuilder
 = Stores.keyValueStoreBuilder(
 storeSupplier, Serdes.String(), stockPerformanceSerde);

  topology.addSource("stocks-source",
                    stringDeserializer,
                    stockTransactionDeserializer,
                    "stock-transactions")
          .addProcessor("stocks-processor",
 () -> new StockPerformanceProcessor(
 stocksStateStore, differentialThreshold), "stocks-source")
          .addStateStore(storeBuilder,"stocks-processor")
          .addSink("stocks-sink",
                   "stock-performance",
                   stringSerializer,
                   stockPerformanceSerializer,
                   "stocks-processor");

init() method

@Override
public void init(ProcessorContext processorContext) {
  super.init(processorContext);
  keyValueStore =
 (KeyValueStore) context().getStateStore(stateStoreName);
  StockPerformancePunctuator punctuator =
 new StockPerformancePunctuator(differentialThreshold,
                              context(),
                              keyValueStore);
  context().schedule(10000, PunctuationType.WALL_CLOCK_TIME,
 punctuator); //10초다마, WALL_CLOCK_TIME을 쓰며, punctuator 는 timestamp 다루는거에 영향을 받음.
 }
}

Let’s take a moment to discuss the difference between these two PunctuationType settings.

Punctuation semantics

(https://livebook.manning.com/#!/book/kafka-streams-in-action/chapter-6/84)

  1. The StreamTask extracts the smallest timestamp from the PartitionGroup. The PartitionGroup is a set of partitions for a given StreamThread, and it contains all timestamp information for all partitions in the group.
  2. During the processing of records, the StreamThread iterates over its StreamTask object, and each task will end up calling punctuate for each of its processors that are eligible for punctuation. Recall that you collect a minimum of 20 trades before you examine an individual stock’s performance.
  3. If the timestamp from the last execution of punctuate (plus the scheduled time) is less than or equal to the extracted timestamp from the PartitionGroup, then Kafka Streams calls that processor’s punctuate() method.
The key point here is that the application advances timestamps via the TimestampExtractor, so punctuate() calls are consistent only if data arrives at a constant rate. If your flow of data is sporadic, the punctuate() method won’t get executed at the regularly scheduled intervals.

끊임없이 오는 데이터랑 연관이 있고, Timestamp가 중요하고, 띄엄띄엄으로 오면 사용할수 없다는 소리인듯


6.3.2 The process() method

  1. Check the state store to see if you have a corresponding StockPerformance object for the record’s stock ticker symbol.
  2. If the store doesn’t contain the StockPerformance object, one is created. Then, the StockPerfomance instance adds the current share price and share volume and updates your calculations.
  3. Start performing calculations once you hit 20 transactions for any given stock.

image

// process() implementation
@Override
public void process(String symbol, StockTransaction transaction) {
 StockPerformance stockPerformance = keyValueStore.get(symbol);

 if (stockPerformance == null) {
  stockPerformance = new StockPerformance();
 }

 stockPerformance.updatePriceStats(transaction.getSharePrice());
 stockPerformance.updateVolumeStats(transaction.getShares());
 stockPerformance.setLastUpdateSent(Instant.now());

 keyValueStore.put(symbol, stockPerformance);
}

process 한 결과는 state store에 저장하고 Punctuator.punctuate method를 통해 레코드를 남긴다.

6.3.3 The punctuator execution

// key value iterator 돌면서, 확인하고 downstream으로 forward할뿐..

@Override
public void punctuate(long timestamp) {
  KeyValueIterator<String, StockPerformance> performanceIterator =
 keyValueStore.all();

  while (performanceIterator.hasNext()) {
        KeyValue<String, StockPerformance> keyValue =
 performanceIterator.next();
        String key = keyValue.key;
        StockPerformance stockPerformance = keyValue.value;

        if (stockPerformance != null) {
            if (stockPerformance.priceDifferential()
 >= differentialThreshold ||
                stockPerformance.volumeDifferential()
 >= differentialThreshold) {
                context.forward(key, stockPerformance);
            }
        }
  }
}

6.4 The co-group processor

6.4.1. Building the co-grouping processor

  1. Define two topics (stock-transactions, events).
  2. Add two processors to consume records from the topics.
  3. Add a third processor to act as an aggregator/co-grouping for the two preceding processors.
  4. Add a state store for the aggregating processor to keep the state for both events.
  5. Add a sink node to write the results to (and/or a printing processor to print results to console).

Defining the source nodes

topology.addSource("Txn-Source",
                  stringDeserializer,
                  stockTransactionDeserializer,
                  "stock-transactions")
       .addSource("Events-Source",
                  stringDeserializer,
                  clickEventDeserializer,
                  "events")

Adding the processor nodes

.addProcessor("Txn-Processor",
              StockTransactionProcessor::new,
              "Txn-Source")

.addProcessor("Events-Processor",
              ClickEventProcessor::new,
              "Events-Source")

.addProcessor("CoGrouping-Processor",
              CogroupingProcessor::new,
              "Txn-Processor",
              "Events-Processor")
public class CogroupingProcessor extends
 AbstractProcessor<String, Tuple<ClickEvent,StockTransaction>> {

    private KeyValueStore<String,
 Tuple<List<ClickEvent>,List<StockTransaction>>> tupleStore;
    public static final  String TUPLE_STORE_NAME = "tupleCoGroupStore";

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        super.init(context);
        tupleStore = (KeyValueStore)
 context().getStateStore(TUPLE_STORE_NAME);
        CogroupingPunctuator punctuator =
 new CogroupingPunctuator(tupleStore, context());
        context().schedule(15000L, STREAM_TIME, punctuator); //15초마다.. STREAM_TIME이용하며 데이터 도착하면 punctautor를 실행함. 흐름이 띄엄띄엄이라면 15초가 넘을수 있다.. 위에서 말한듯
    }
@Override
    public void process(String key,
 Tuple<ClickEvent, StockTransaction> value) {
        Tuple<List<ClickEvent>, List<StockTransaction>> cogroupedTuple  = tupleStore.get(key);
        if (cogroupedTuple == null) {
             cogroupedTuple = Tuple.of(new ArrayList<>(), new ArrayList<>());
        }

        if (value._1 != null) {
            cogroupedTuple._1.add(value._1);
        }

        if (value._2 != null) {
            cogroupedTuple._2.add(value._2);
        }

        tupleStore.put(key, cogroupedTuple);
    }
}
// The CogroupingPunctuator.punctuate() method
// leaving out class declaration and constructor for clarity
  @Override
  public void punctuate(long timestamp) {
    KeyValueIterator<String, Tuple<List<ClickEvent>,
 List<StockTransaction>>> iterator = tupleStore.all();

    while (iterator.hasNext()) {
      KeyValue<String, Tuple<List<ClickEvent>, List<StockTransaction>>>  cogrouping = iterator.next();

      // if either list contains values forward results
      if (cogrouping.value != null && (!cogrouping.value._1.isEmpty() || !cogrouping.value._2.isEmpty())) {
          List<ClickEvent> clickEvents = new ArrayList<>(cogrouping.value._1);
          List<StockTransaction> stockTransactions = new ArrayList<>(cogrouping.value._2);

          context.forward(cogrouping.key, Tuple.of(clickEvents, stockTransactions));
          cogrouped.value._1.clear();
          cogrouped.value._2.clear();
          tupleStore.put(cogrouped.key, cogrouped.value);
      }
    }
    iterator.close();
}

Adding the state store && Adding the sink node

image

6.5. Integrating the Processor API and the Kafka Streams API


Summary

kgneng2 commented 4 years ago

6장 kafka stream

Processor API에 대해서 알아봅시다.

6.1

ZMartKafkaStreamsApp : kafka stream api ZMartProcessorApp : Processor API

6.2 PopsHopsApplication.java 보면서 설명

6.3 더 파보기.. 요구사항

6.3.2

  1. state stroe에서 StockPerformance object를 확인한다.
  2. 없다면, 생성하고, 현재 가격 및 토탈양에 추가 및 업데이트 진행한다.
  3. 일단 주어진 20개 주식을 계산한다.

image

6.4

목표 : 두개의 거래를 조합하는건데 N초마다, 근데 하나의 스트림이 도착안한다고 기다리지 않음.