confluentinc / demo-scene

👾Scripts and samples to support Confluent Demos and Talks. ⚠️Might be rough around the edges ;-) 👉For automated tutorials and QA'd code, see https://github.com/confluentinc/examples/
https://developer.confluent.io
Apache License 2.0
1.5k stars 896 forks source link

source code for spring-kafka learn-kafka course #207

Closed gAmUssA closed 3 years ago

gAmUssA commented 3 years ago

includes three projects - spring-kafka-ccloud (basic producer, consumer, streams, admin api), spring-kafka-avro (avro version of producer and consumer), spring-kafka-scs (spring cloud streams)

survivant commented 3 years ago

I added a line in the processor to print what was the value received

KTable<String, Long> wordCounts = textLines
        //.print(Printed.toSysOut())
        .filter((key, value) -> {System.out.println("    Process Key=" + key + "  value=" + value); return true;})
        // Split each text line, by whitespace, into words.  The text lines are the message
        // values, i.e. we can ignore whatever data is in the message keys and thus invoke
        // `flatMapValues` instead of the more generic `flatMap`.
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        // We use `groupBy` to ensure the words are available as message keys
        .groupBy((key, value) -> value, Grouped.with(stringSerde, stringSerde))
        // Count the occurrences of each word (message key).
        .count(Materialized.as("counts"));

and I wanted to add a second Consumer. A consumer that will consume the messages on the topic "hobbit"

@Component
class ConsumerHobbit {

  @KafkaListener(topics = {"hobbit"}, groupId = "spring-boot-kafka2")
  public void consume(ConsumerRecord<Object, Object> record) {
    System.out.println("hobbit Consumer received = " + record.value() + " with key " + record.key());
  }
}

but it doesn't work . I obtain those kind of errors


java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar:2.6.7]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition hobbit-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

my goal is to create a POC with few consumers with different groupid. They all need to receive all the messages (mostly System.out for now) with Pojo (without avro). I could use String for now, I'll adapt the code later.

with those features, I think the demo will be more complete.

survivant commented 3 years ago

Ho.. I got Hobbti Consumer to work.

@Configuration
public class HobbitConsumerConfig {
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${hobbit.topic.group.id}")
    private String hobbitGroupId;

    public ConsumerFactory<Integer, String> hobbitConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "HobbitConsumer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, hobbitGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer, String> hobbitKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(hobbitConsumerFactory());
        return factory;
    }

}

@Component
class ConsumerHobbit {

  @KafkaListener(topics = {"hobbit"}, groupId = "spring-boot-kafka2", containerFactory = "hobbitKafkaListenerContainerFactory")
  public void consume(ConsumerRecord<Object, Object> record) {
    System.out.println("hobbit Consumer received = " + record.value() + " with key " + record.key());
  }
}
survivant commented 3 years ago

could it be possible to add a StreamProcessor with Window example in this demo ?

Something like that ? (PS. When I'm calling the IQ API.. I get this error : (org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, countsWindow, may have migrated to another instance.)

package com.example.kafkaeventalarm.stream;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import javax.annotation.PreDestroy;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.stereotype.Service;

import com.example.kafkaeventalarm.model.Order;
import com.example.kafkaeventalarm.stream.serdes.SerdeFactory;

@Service
public class KafkaStreamOrderProcessorWindow {
    private final Logger logger = LoggerFactory.getLogger(KafkaStreamOrderProcessorWindow.class);

    @Value("${order.window.topic.name}")
    private String inputTopicWindow;

    @Value("${order.stream.window.output.name}")
    private String orderStreamWindowOutput;

    private KafkaStreams streams;

    @Qualifier("OrderStreamProcessorWindow")
    @Autowired
    StreamsBuilderFactoryBean streamsBuilderFactoryBean;

    @Autowired
    public void process(@Qualifier("OrderStreamProcessorWindow") StreamsBuilder builder) {

        Map<String, Object> serdeProps = new HashMap<>();
        Serde<Order> orderSerde = SerdeFactory.createSerde(Order.class, serdeProps);

        // Serializers/deserializers (serde) for String and Long types
        final Serde<Integer> integerSerde = Serdes.Integer();
        final Serde<String> stringSerde = Serdes.String();
        final Serde<Long> longSerde = Serdes.Long();

        // Construct a `KStream` from the input topic where message values
        KStream<String, Order> textLines = builder.stream(inputTopicWindow, Consumed.with(stringSerde, orderSerde));

        // avec un WINDOW
        textLines
                .filter((key, value) -> {System.out.println("KafkaStreamOrderProcessorWindow Key=" + key + "  value=" + value); return true;})
                .selectKey((key, value) -> value.getStatus())
                .groupBy((s, order) -> order.getStatus(), Grouped.with(stringSerde, orderSerde))
                .windowedBy(TimeWindows.of(Duration.ofMinutes(1L)))
                .count(Materialized.as("countsWindow"))
                .toStream()
                .map((Windowed<String> key, Long count) -> new KeyValue<>(key.key(), count))
                .to(orderStreamWindowOutput, Produced.with(stringSerde, longSerde));

        streams = new KafkaStreams(builder.build(), streamsBuilderFactoryBean.getStreamsConfiguration());
        // Clean local store between runs
        streams.cleanUp();
        streams.start();

    }

    // Example: Wait until the store of type T is queryable.  When it is, return a reference to the store.
    public static <T> T waitUntilStoreIsQueryable(final String storeName,
                                                  final QueryableStoreType<T> queryableStoreType,
                                                  final KafkaStreams streams) throws InterruptedException {
        while (true) {
            try {
                return streams.store(storeName, queryableStoreType);
            } catch (InvalidStateStoreException ignored) {
                ignored.printStackTrace();
                // store not yet ready for querying
                Thread.sleep(1000);
            }
        }
    }

    // il faut avoir une WINDOW ...
    public ReadOnlyKeyValueStore<String, Long> getInteractiveQueryCountLastMinute() throws Exception {
        //return waitUntilStoreIsQueryable(orderStreamWindowOutput, QueryableStoreTypes.keyValueStore(), streams);

        return streams.store("countsWindow", QueryableStoreTypes.keyValueStore());
    }

    @PreDestroy
    public void destroy() {
        streams.close();
    }
}