yeomko22 / TIL

Today I learned
1 stars 0 forks source link

kafka streams #111

Open yeomko22 opened 2 years ago

yeomko22 commented 2 years ago

kafka streams

14

카프카 스트림즈 애플리케이션

topology

15

스트림즈 DSL 데이터 처리 예시

프로세서 API로 구현하는 데이터 처리 예시

yeomko22 commented 2 years ago

StreamsDSL

KStream

KTable

GlobelKTable

yeomko22 commented 2 years ago

streams application example

public class SimpleStreamApplication {
    private static String APPLICATION_NAME = "streams-application";
    private static String BOOTSTRAP_SERVER = "my-kafka:9092";
    private static String STREAM_LOG = "stream_log";
    private static String STREAM_LOG_COPY = "stream_log_copy";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> streamLog = builder.stream(STREAM_LOG);
        streamLog.to(STREAM_LOG_COPY);

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
yeomko22 commented 2 years ago

streamsDSL filter

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> streamLog = builder.stream(STREAM_LOG);
        KStream<String, String> filteredStream = streamLog.filter((key, value) -> value.length() > 5);
        filteredStream.to(STREAM_LOG_COPY);
yeomko22 commented 2 years ago

streamsDSL KTable, KStream join

        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
        KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
        orderStream.join(addressTable,
                (order, address) -> order + " send to " + address)
                .to(ORDER_JOIN_STREAM);
yeomko22 commented 2 years ago

streamsDSL - GlobalKTable과 KStream join

코파티셔닝이 되어 있지 않은 토픽을 조인해야할 때 가능한 두 가지 방법

GlobalKTable join example

        StreamsBuilder builder = new StreamsBuilder();
        GlobalKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE);
        KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
        orderStream.join(addressGlobalTable,
                        (orderKey, orderValue) -> orderKey,
                        (order, address) -> order + " send to " + address)
                        .to(ORDER_JOIN_STREAM);
yeomko22 commented 2 years ago

ProcessorAPI

        Topology topology = new Topology();
        topology.addSource("Source", STREAM_LOG)
                .addProcessor("Process", () -> new FilterProcessor(),
                        "Source")
                .addSink("Sink",
                        STREAM_LOG_FILTER,
                        "Process");