so3500 / TIL

0 stars 0 forks source link

2024-01-13 #7

Open so3500 opened 5 months ago

so3500 commented 5 months ago

kafka document APIs - Streams API

2. APIS

Book

함께 자라기 애자일로 가는길

so3500 commented 5 months ago

Introduction Streams Creating a Streams Application

The Kafka Streams demo and the app development tutorial demonstrate how to code and run such a streaming application from start to finish.

  1. run demo app
  2. tutorial : write app

it is designed to operate on an infinite, unbounded stream of data. Since it must assume potentially unbounded input data, it will periodically output its current stat and results while continuing to process more data because it cannot know when it has processed "all" the input data.

step1 download the code

step2 start the kafka server - using zookeeper or KRaft generate a cluster uuid KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

format log directories

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

start the kafka server

bin/kafka-server-start.sh config/kraft/server.properties

step3: prepare input topic and start kafka producer we create the input topic named streams-plaintext-input and the output topic named streams-wordcount-output

bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input Created topic "streams-plaintext-input".

we create the output topic with compaction enabled because the output stream is a changelog stream. (cf. explanation of application output below)

the created topic can be described with the same kafka-topic tool:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact,segment.bytes=1073741824 Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824 Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0

step4 : start the wordcount application

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

now we can start the console producer in a seperate terminal to write some input data to this topic:

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

and inspect the output of the wordcount demo application by reading from its output topic with the console consumer in a seperate terminal:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

step5: process some data

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input all streams lead to kafka. haha . . . this is my input text.

the first colum is the kafka message key in java.lang.String format and represents a word that is being counted, and the second column is the message value in java.lang.Long format, representing the word's latest count.

Now let's continue writing on more message with ...

...

Looking beyond the scope of this conrete example, what Kafka Streams is doing here is to leverage the duality between a table and a changelog stream (here: table = the KTable, changelog stream = the downstream KStream) you can publish every change of the table to a stream, and if you consume the entire changelog stream from begining to end, you can resonctruct the contents of the table.

streams-table-updates-01 streams-table-updates-02

step 6: teardown the application