bakdata / streams-bootstrap

Utility functions and base classes for Kafka Streams applications
MIT License
29 stars 3 forks source link
helm kafka kafka-streams kubernetes stream-processing

Build Status Sonarcloud status Code coverage Maven

streams-bootstrap

streams-bootstrap provides base classes and utility functions for Kafka Streams applications.

It provides a common way to

Visit our blogpost and demo for an overview and a demo application.
The common configuration and deployments on Kubernetes are supported by the Streams Explorer, which makes it possible to explore and monitor data pipelines in Apache Kafka.

Getting Started

You can add streams-bootstrap via Maven Central.

Gradle

implementation group: 'com.bakdata.kafka', name: 'streams-bootstrap-cli', version: '3.0.0'

With Kotlin DSL

implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", version = "3.0.0")

Maven


<dependency>
    <groupId>com.bakdata.kafka</groupId>
  <artifactId>streams-bootstrap-cli</artifactId>
  <version>3.0.0</version>
</dependency>

For other build tools or versions, refer to the latest version in MvnRepository.

Usage

Kafka Streams

Create a subclass of KafkaStreamsApplication and implement the abstract methods buildTopology() and getUniqueAppId(). You can define the topology of your application in buildTopology().

import com.bakdata.kafka.KafkaStreamsApplication;
import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

public class MyStreamsApplication extends KafkaStreamsApplication {
    public static void main(final String[] args) {
      startApplication(new MyStreamsApplication(), args);
    }

    @Override
    public StreamsApp createApp() {
      return new StreamsApp() {
        @Override
        public void buildTopology(final TopologyBuilder builder) {
          final KStream<String, String> input = builder.streamInput();

          // your topology

          input.to(builder.getTopics().getOutputTopic());
        }

        @Override
        public String getUniqueAppId(final StreamsTopicConfig topics) {
          return "streams-bootstrap-app-" + topics.getOutputTopic();
        }

        @Override
        public SerdeConfig defaultSerializationConfig() {
          return new SerdeConfig(StringSerde.class, StringSerde.class);
        }

        // Optionally you can define custom Kafka properties
        @Override
        public Map<String, Object> createKafkaProperties() {
          return Map.of(
                  // your config
          );
        }
      };
    }
}

The following configuration options are available:

Additionally, the following commands are available:

Kafka producer

Create a subclass of KafkaProducerApplication.

import com.bakdata.kafka.KafkaProducerApplication;
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SerializerConfig;
import java.util.Map;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;

public class MyProducerApplication extends KafkaProducerApplication {
    public static void main(final String[] args) {
      startApplication(new MyProducerApplication(), args);
    }

    @Override
    public ProducerApp createApp() {
      return new ProducerApp() {
        @Override
        public ProducerRunnable buildRunnable(final ProducerBuilder builder) {
          return () -> {
            try (final Producer<Object, Object> producer = builder.createProducer()) {
              // your producer
            }
          };
        }

        @Override
        public SerializerConfig defaultSerializationConfig() {
          return new SerializerConfig(StringSerializer.class, StringSerializer.class);
        }

        // Optionally you can define custom Kafka properties
        @Override
        public Map<String, Object> createKafkaProperties() {
          return Map.of(
                  // your config
          );
        }
      };
    }
}

The following configuration options are available:

Additionally, the following commands are available:

Helm Charts

For the configuration and deployment to Kubernetes, you can use the Helm Charts.

To configure your streams app, you can use the values.yaml as a starting point. We also provide a chart to clean your streams app.

To configure your producer app, you can use the values.yaml as a starting point. We also provide a chart to clean your producer app.

Development

If you want to contribute to this project, you can simply clone the repository and build it via Gradle. All dependencies should be included in the Gradle files, there are no external prerequisites.

> git clone git@github.com:bakdata/streams-bootstrap.git
> cd streams-bootstrap && ./gradlew build

Please note, that we have code styles for Java. They are basically the Google style guide, with some small modifications.

Contributing

We are happy if you want to contribute to this project. If you find any bugs or have suggestions for improvements, please open an issue. We are also happy to accept your PRs. Just open an issue beforehand and let us know what you want to do and why.

License

This project is licensed under the MIT license. Have a look at the LICENSE for more details.