Let's create a module bootique-kafka-streams to support setting up and running Kafka streams. It will need to address streams configuration as well as shutdown strategy.
Configuration... While org.apache.kafka.streams.StreamsConfig somewhat overlaps with producer and consumer configs, it is much more complicated (internally it creates "adminClient", "consumer", "globalConsumer", "producer" and "restoreConsumer"). So we will use a separate top level config with the above labels for the corresponding subconfigs:
kafkastreams:
clusters:
# Single cluster is by definition a default cluster
# TODO: Should we go further and make "127.0.0.1:9092" the default if nothing is set?
cluster1: 127.0.0.1:9092
adminClient:
consumer:
globalConsumer:
producer:
restoreConsumer:
Startup/shutdown strategy... Per Kafka tutorials each "streams" object starts on the background and unblocks the calling thread. And then later its "close()" method needs to be called. Not unlike say Jetty .. the difference being, we will support starting multiple streams with different topologies.
Bootique injectable KafkaStreams factory singleton may look somewhat similar to our unit test runners. We'll wrap KafkaStreams in KafkaStreamsRunner to include a reference to ShutdownManager to close the streams when the app is going down. E.g.:
@Inject
KafkaStreamsFactory f;
// 1. create wrapped KafkaStreams, but do not run ... the caller will manage startup.
// Shutdown can be managed by the caller or will happen via `ShutdownManager`
KafkaStreamsRunner s1 = f.topology(t).cluster(c).props(p).create();
// registers self with ShutdownManager, starts streams and immediately unblocks
s1.start();
// get access to the actual streams object
KafkaStreams ks1 = s1.getStreams();
// shutdown explicitly
s1.close();
// 2. A shortcut returning started `KafkaStreamsRunner`
f.topology(t).cluster(c).props(p).start();
// 3. Default cluster and properties
f.topology(t).start();
Initially we'll support a subset of config properties in YAML. Will add more as we go. Specifically all the internal client configs can't be configured in YAML ("adminClient", "consumer", "globalConsumer", "producer" and "restoreConsumer").
The workaround to config the above is to pass properties to the builder (KafkaStreamsFactory.topology(t).properties(p).run())
Config priority/override rules:
Configs lower om the list override those higher on the list:
Configs from YAML. Those are the default properties for all streams that can be overridden during each instance creation.
Configs passed to the {@link #properties(Properties)} method
Configs set via "named" builder methods like applicationId(String), etc.
Let's create a module
bootique-kafka-streams
to support setting up and running Kafka streams. It will need to address streams configuration as well as shutdown strategy.org.apache.kafka.streams.StreamsConfig
somewhat overlaps with producer and consumer configs, it is much more complicated (internally it creates "adminClient", "consumer", "globalConsumer", "producer" and "restoreConsumer"). So we will use a separate top level config with the above labels for the corresponding subconfigs:Bootique injectable KafkaStreams factory singleton may look somewhat similar to our unit test runners. We'll wrap
KafkaStreams
inKafkaStreamsRunner
to include a reference toShutdownManager
to close the streams when the app is going down. E.g.: