DDTH / ddth-kafka

DDTH's Apache Kafka Libraries and Utilities
MIT License
9 stars 4 forks source link

ddth-kafka

DDTH's Kafka utility library to simplify Apache Kafka usage.

Project home: https://github.com/DDTH/ddth-kafka

ddth-kafka requires Java 11+ since v2.0.0, for Java 8, use v1.3.x

Installation

Latest release version: 2.0.0. See RELEASE-NOTES.md.

Maven dependency:

<dependency>
    <groupId>com.github.ddth</groupId>
    <artifactId>ddth-kafka</artifactId>
    <version>2.0.0</version>
</dependency>

Usage

IMPORTANT!

ddth-kafka:2.0.0 uses org.apache.kafka:kafka-clients:2.2.1 and is tested against KafkaServer_2.12-2.2.1. It may not work with old Kafka brokers. Upgrade your Kafka broker cluster if needed.

Create and initialize KafkaClient instance

import com.github.ddth.kafka.KafkaClient;
import java.util.Properties;

String bootstrapServers = "localhost:9092;node2:port;node3:port";
KafkaClient kafkaClient = new KafkaClient(bootstrapServers);

//custom configurations for Kafka producers
//Properties customProducerProps = ...
//kafkaClient.setProducerProperties(customProducerProps);

//custom configurations for Kafka consumers
//Properties customConsumerProps = ...
//kafkaClient.setConsumerProperties(customConsumerProps);

kafkaClient.init();

KafkaClient uses a default configuration set for Kafka producers and consumers. However, the configuration can be overridden by KafkaClient.setProducerProperties(Properties) and KafkaClient.setConsumerProperties(Properties). Custom configurations are merged with the default one: if a custom setting exists, it overrides the default one.

See Kafka documentations for Producer and Consumer configs.

Send/Publish messages

import com.github.ddth.kafka.KafkaClient;
import com.github.ddth.kafka.KafkaMessage;

//prepare the message to be sent
String topic = "topic-name";
String content = "This is a message";
//or: byte[] content = { 0, 2, 3, 4, 5, 6, 7, 8, 9 };
KafkaMessage msg = new KafkaMessage(topic, content);
// or to specify message key for partitioning: KafkaMessage msg = new KafkaMessage(topic, key, content);

//send message synchronously, using the default producer type
KafkaMessage result = kafkaClient.sendMessage(msg);
//or use a specific producer type
KafkaMessage result = kafkaClient.sendMessage(KafkaClient.ProducerType.ALL_ACKS, msg);

//send message asynchronously
Future<KafkaMessage> result = kafkaClient.sendMessageAsync(msg);
//or: Future<KafkaMessage> result = kafkaClient.sendMessageAsync(KafkaClient.ProducerType.NO_ACK, msg);

//send message asynchronously, and receive raw result from Kafka's Java client
Future<org.apache.kafka.clients.producer.RecordMetadata> result = kafkaClient.sendMessageRaw(msg);
//some overload methods:
Future<org.apache.kafka.clients.producer.RecordMetadata> result = kafkaClient.sendMessageRaw(KafkaClient.ProducerType.LEADER_ACK, msg);
org.apache.kafka.clients.producer.Callback callback = ...
Future<org.apache.kafka.clients.producer.RecordMetadata> result = kafkaClient.sendMessageRaw(msg, callback);
Future<org.apache.kafka.clients.producer.RecordMetadata> result = kafkaClient.sendMessageRaw(KafkaClient.ProducerType.LEADER_ACK, msg, callback);

//send a batch of messages
List<KafkaMessage> buffer = ...
List<KafkaMessage> result = kafkaClient.sendBulk(buffer.toArray(KafkaMessage.EMPTY_ARRAY));
//or: List<KafkaMessage> result = kafkaClient.sendBulk(KafkaClient.ProducerType.ALL_ACKS, buffer.toArray(KafkaMessage.EMPTY_ARRAY));

There are 3 producer types:

KafkaClient.sendBulk(...) will not send messages using transaction. Some messages may fail while others success.

Consume messages

String consumerGroupId = "my-group-id";
String topicName = "my-topic";

//consume one single message (one-by-one)
KafkaMessage msg = kafkaClient.consumeMessage(consumerGroupId, topicName);

//consume messages using message listener
IKafkaMessageListener msgListener = (msg) -> System.out.println(message.contentAsString());
boolean consumeFromBeginning = true;
kafkaClient.addMessageListener(consumerGroupId, consumeFromBeginning, topicName, messageListener);
//remove message listener to stop consuming: 
//kafkaClient.removeMessageListener(consumerGroupId, topicName, messageListener)

Consumer group-id:

Do NOT use a same KafkaClient to consume messages both one-by-one and via message-listener. Create a KafkaClient to consume messages one-by-one, and create another client (with different group-id) to consume messages using listener.

Examples

More examples code in src/test/java/com/github/ddth/kafka/qnd/.

License

See LICENSE.txt for details. Copyright (c) 2014-2019 Thanh Ba Nguyen.

Third party libraries are distributed under their own licenses.