Kafka Junit provides helpers for starting and tearing down a Kafka broker during tests.
Please note that version 3.x.x drops Java 7 support and contains breaking API changes.
Releases are available on Maven Central.
Snapshot versions containing builds from the latest master
are available in the Sonatype snapshots repo.
http://charithe.github.io/kafka-junit/
Create an instance of the rule in your test class and annotate it with @Rule
. This will start and stop the
broker between each test invocation.
@Rule
public KafkaJunitRule kafkaRule = new KafkaJunitRule(EphemeralKafkaBroker.create());
To spin up the broker at the beginning of a test suite and tear it down at the end, use @ClassRule
.
@ClassRule
public static KafkaJunitRule kafkaRule = new KafkaJunitRule(EphemeralKafkaBroker.create());
kafkaRule
can be referenced from within your test methods to obtain information about the Kafka broker.
@Test
public void testSomething(){
// Convenience methods to produce and consume messages
kafkaRule.helper().produceStrings("my-test-topic", "a", "b", "c", "d", "e");
List<String> result = kafkaRule.helper().consumeStrings("my-test-topic", 5).get();
// or use the built-in producers and consumers
KafkaProducer<String, String> producer = kafkaRule.helper().createStringProducer();
KafkaConsumer<String, String> consumer = kafkaRule.helper().createStringConsumer();
// Alternatively, the Zookeeper connection String and the broker port can be retrieved to generate your own config
String zkConnStr = kafkaRule.helper().zookeeperConnectionString();
int brokerPort = kafkaRule.helper().kafkaPort();
}
EphemeralKafkaBroker
contains the core logic used by the JUnit rule and can be used independently.
KafkaHelper
contains a bunch of convenience methods to work with the EphemeralKafkaBroker
JUnit 5 does not have support for Rules, but instead uses the new JUnit 5 Extension Model.
So if you are using JUnit 5 you can use KafkaJunitExtension
which provides a kafka broker that is started and stopped for each test.
The extension is configured using the optional class annotation @KafkaJunitExtensionConfig
and provides
dependency injection for constructors and methods for the classes KafkaHelper
and EphemeralKafkaBroker
@ExtendWith(KafkaJunitExtension.class)
@KafkaJunitExtensionConfig(startupMode = StartupMode.WAIT_FOR_STARTUP)
class MyTestClass {
@Test
void testSomething(KafkaHelper kafkaHelper) throws ExecutionException, InterruptedException {
// Convenience methods to produce and consume messages
kafkaHelper.produceStrings("my-test-topic", "a", "b", "c", "d", "e");
List<String> result = kafkaHelper.consumeStrings("my-test-topic", 5).get();
assertThat(result).containsExactlyInAnyOrder("a", "b", "c", "d", "e");
// or use the built-in producers and consumers
KafkaProducer<String, String> producer = kafkaHelper.createStringProducer();
KafkaConsumer<String, String> consumer = kafkaHelper.createStringConsumer();
// Alternatively, the Zookeeper connection String and the broker port can be retrieved to generate your own config
String zkConnStr = kafkaHelper.zookeeperConnectionString();
int brokerPort = kafkaHelper.kafkaPort();
}
}
Refer to Javadocs and unit tests for more usage examples.