so3500 / TIL

0 stars 0 forks source link

2024-01-19 #10

Open so3500 opened 8 months ago

so3500 commented 8 months ago

Apache Kafka Streams

TUTORIAL: WRITE A KAFKA STREAMS APPLICATION

so3500 commented 8 months ago

Apache Kafka Streams

How to run app?

Step 1 : Download the code

Step 2 : Start the Kafka server

Kafka with Zookeeper...

Kafka with KRaft

# Generate a Cluster UUID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

# Format Log Directories
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

# Start the Kafka Server
bin/kafka-server-start.sh config/kraft/server.properties

Step 3: Prepare input topic and start Kafka producer

bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".

bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
Created topic "streams-wordcount-output".

# describe
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

Step 4: Start the Wordcount Application

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

# start producer
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

# start consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Step 5: process some data

# input some data to producer
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
all streams lead to kafka

# output from consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all     1
streams 1
lead    1
to      1
kafka   1

Pipe

package org.example;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

public class Pipe {
    public static void main(String[] args) {

        Properties props = new Properties();
        // distinguish itself with other applications talking to the same kafka cluster
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        // specifies a list of host/port pairs to use for establishing the initial connection to the kafka cluster
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // customize serialization, deserialization libraries for the record key-value pairs
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        /* define computational logic of Streams application */
        final StreamsBuilder builder = new StreamsBuilder();
        // continuously generating records from its source kafka topic. the records are organized as `String` typed key-value pairs
        KStream<String, String> source = builder.stream("stream-plaintext-input");
        source.to("streams-pipe-output");
        // or builder.stream().to();

        // the constructed topology has two processor nodes, a source node `KSTREAM-SOURCE-0000000000` and a sink node `KSTREAM-SINK-0000000001`
        // KSTREAM-SOURCE-0000000000 : continously read records from Kafka topic `streams-plaintext-input` and pipe them to its downstream node `KSTREAM-SINK-0..1`
        // KSTREAM-SINK-0..0 will write each of its received record in order to another Kafka topic streams-pipe-out
        // the `-->` and `<--` arrows dictates the downstream and upstream processor nodes of this node
        // ths simple topology has no global state stores associated with it (we will talk about state stores more in the following sections)
        final Topology topology = builder.build();
        System.out.println(topology.describe());

        // construct stream client with the two components `Properties` instance and the `Topology` object.
        final CountDownLatch latch;
        try (KafkaStreams streams = new KafkaStreams(topology, props)) {

            // add a shutdown hook with a countdown latch to capture a user interrupt and close the client upon termination this program
            latch = new CountDownLatch(1);
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });

            // trigger the execution of this client.
            // The execution won't stop until `close()` is called on this client.
            streams.start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

LineSplit

package org.example;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;

public class LineSplit {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // since each of the source stream's record is a `String` typed key-value pair,
        // let's threat the value string as a text line and split it into words with a `FlatMapValues` operator
        final StreamsBuilder builder = new StreamsBuilder();

        // input stream
        KStream<String, String> source = builder.stream("streams-plaintext-input");

        // processing each record from its source stream in order and breaking its value string into a list of words
        // and producing each word as a new record to the output words stream.
        // this is a stateless operator that does not need to keep track of any previously received records or processed results.
        source.flatMapValues((ValueMapper<String, Iterable<String>>)value -> Arrays.asList(value.split("\\W+")))
            .to("streams-linesplit-output");

        final Topology topology = builder.build();
        System.out.println(topology.describe());
        // processor
        // KSTREAM-FLATMAPVALUES-0..1 is injected into the topology between the original source and sink nodes
        // it takes the source node as its parent and the sink node as its child.
        // each record fetched by the source node will first traverse to the newly added KSTREAM-FLATMAPVALUES-0..1 node to be processed
        // one or more new records will be generated as a result.
        // they will continue traverse down to the sink node to be written back to kafka.
        // "stateless"

        try (KafkaStreams streams = new KafkaStreams(topology, props)) {
            final CountDownLatch latch = new CountDownLatch(1);
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
            streams.state();

            try {
                latch.await();
            } catch (InterruptedException e) {
                System.exit(1);
            }
            System.exit(0);
        }

    }
}

WordCount

package org.example;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;

public class WordCount {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("streams-plaintext-input");

        // in order to count the words we can first modify the flatMapValues operator to treat all of them as lower case
        // in order to do counting aggregation we have to first specify that we want to key the stream on the value string,
        // which generates a running count on each of the grouped keys:
        source.flatMapValues((ValueMapper<String, Iterable<String>>)value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
            // generate a new grouped stream, which can then be aggregated by a count operator,
            .groupBy((key, value) -> value)
            // Materialize the result into a KeyValueStore named "counts-store"
            // The Materialized store is always of type <Bytes, byte[]>> as this is the format of the inner most store.
            // count-store can be queried in real-time
            .count(Materialized.as("counts-store"))
            // we can also write the counts KTable's changelog stream back into another Kafka topic, say 'streams-wordcount-output'
            // Because the result is a changelog stream, the output topic 'streams-wordcount-output' should be configured with log compaction enabled.
            // Note that this time value type is no longer String but Long, so the default serialization classes are not viable for writing it to Kafka anymore.
            // We need to provide oerriden serialization methods for Long types, otherwise runtime exception will be thrown:
            .toStream()
            .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final Topology topology = builder.build();
        System.out.println(topology.describe());

        try (KafkaStreams streams = new KafkaStreams(topology, props)) {
            final CountDownLatch latch = new CountDownLatch(1);
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
            streams.state();

            try {
                latch.await();
            } catch (InterruptedException e) {
                System.exit(1);
            }
            System.exit(0);
        }
    }
}