page_type: sample languages:
The examples in this repository demonstrate how to use the Kafka Consumer, Producer, and Streaming APIs with a Kafka on HDInsight cluster.
There are two projects included in this repository:
Producer-Consumer: This contains a producer and consumer that use a Kafka topic named test
.
Streaming: This contains an application that uses the Kafka streaming API (in Kafka 0.10.0 or higher) that reads data from the test
topic, splits the data into words, and writes a count of words into the wordcounts
topic.
NOTE: This both projects assume Kafka 0.10.0, which is available with Kafka on HDInsight cluster version 3.6.
To run the consumer and producer example, use the following steps:
Fork/Clone the repository to your development environment.
Install Java JDK 8 or higher. This was tested with Oracle Java 8, but should work under things like OpenJDK as well.
Install Maven.
Assuming Java and Maven are both in the path, and everything is configured fine for JAVA_HOME, use the following commands to build the consumer and producer example:
cd Producer-Consumer
mvn clean package
A file named kafka-producer-consumer-1.0-SNAPSHOT.jar
is now available in the target
directory.
Use SCP to upload the file to the Kafka cluster:
scp ./target/kafka-producer-consumer-1.0-SNAPSHOT.jar SSHUSER@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Replace SSHUSER with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. When prompted enter the password for the SSH user.
Use SSH to connect to the cluster:
ssh USERNAME@CLUSTERNAME
Use the following commands in the SSH session to get the Zookeeper hosts and Kafka brokers for the cluster. You need this information when working with Kafka. Note that JQ is also installed, as it makes it easier to parse the JSON returned from Ambari. Replace PASSWORD with the login (admin) password for the cluster. Replace KAFKANAME with the name of the Kafka on HDInsight cluster.
sudo apt -y install jq
export KAFKAZKHOSTS=`curl -sS -u admin:$PASSWORD -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
export KAFKABROKERS=`curl -sS -u admin:$PASSWORD -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Use the following to verify that the environment variables have been correctly populated:
echo '$KAFKAZKHOSTS='$KAFKAZKHOSTS
echo '$KAFKABROKERS='$KAFKABROKERS
The following is an example of the contents of $KAFKAZKHOSTS
:
zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
The following is an example of the contents of $KAFKABROKERS
:
wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
NOTE: This information may change as you perform scaling operations on the cluster, as this adds and removes worker nodes. You should always retrieve the Zookeeper and Broker information before working with Kafka.
IMPORTANT: You don't have to provide all broker or Zookeeper nodes. A connection to one broker or Zookeeper node can be used to learn about the others. In this example, the list of hosts is trimmed to two entries.
This example uses a topic named test
. Use the following to create this topic:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
Use the producer-consumer example to write records to the topic:
java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
A counter displays how many records have been written.
Use the producer-consumer to read the records that were just written:
java -jar kafka-producer-consumer.jar consumer test $KAFKABROKERS
This returns a list of the random sentences, along with a count of how many are read.
NOTE: The streaming example expects that you have already setup the test
topic from the previous section.
On your development environment, change to the Streaming
directory and use the following to create a jar for this project:
mvn clean package
Use SCP to copy the kafka-streaming-1.0-SNAPSHOT.jar
file to your HDInsight cluster:
scp ./target/kafka-streaming-1.0-SNAPSHOT.jar SSHUSER@CLUSTERNAME-ssh.azurehdinsight.net:kafka-streaming.jar
Replace SSHUSER with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. When prompted enter the password for the SSH user.
Once the file has been uploaded, return to the SSH connection to your HDInsight cluster and use the following commands to create the wordcounts
and wordcount-example-Counts-changelog
topics:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Use the following command to start the streaming process in the background:
java -jar kafka-streaming.jar $KAFKABROKERS 2>/dev/null &
While it is running, use the producer to send messages to the test
topic:
java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS &>/dev/null &
Use the following to view the output that is written to the wordcounts
topic:
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
NOTE: You have to tell the consumer to print the key (which contains the word value) and the deserializer to use for the key and value in order to view the data.
The output is similar to the following:
dwarfs 13635
ago 13664
snow 13636
dwarfs 13636
ago 13665
a 13803
ago 13666
a 13804
ago 13667
ago 13668
jumped 13640
jumped 13641
a 13805
snow 13637
Use Ctrl + C to exit the consumer, then use the fg
command to bring the streaming background task to the foreground. Use Ctrl + C to exit it also.