private final String topic;
private final Properties props;
public KafkaExample(String brokers, String username, String password) {
this.topic = username + "-default";
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(jaasTemplate, username, password);
String serializer = StringSerializer.class.getName();
String deserializer = StringDeserializer.class.getName();
props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("group.id", username + "-consumer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", deserializer);
props.put("value.deserializer", deserializer);
props.put("key.serializer", serializer);
props.put("value.serializer", serializer);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config", jaasCfg);
}
public void consume() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("%s [%d] offset=%d, key=%s, value=\"%s\"\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
}
}
public void produce() {
Thread one = new Thread() {
public void run() {
try {
Producer<String, String> producer = new KafkaProducer<>(props);
int i = 0;
while(true) {
Date d = new Date();
producer.send(new ProducerRecord<>(topic, Integer.toString(i), d.toString()));
Thread.sleep(1000);
i++;
}
} catch (InterruptedException v) {
System.out.println(v);
}
}
};
one.start();
}
public static void main(String[] args) {
String brokers = "velomobile-01.srvs.cloudkafka.com:9094,velomobile-02.srvs.cloudkafka.com:9094,velomobile-03.srvs.cloudkafka.com:9094";
String username = "username";
String password = "password";
KafkaExample c = new KafkaExample(brokers, username, password);
c.produce();
//System.out.println("---------- produced ----------");
//c.consume();
}
}
when i ran it as java application in eclipse, it is showing this in console
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "Thread-0" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:181)
at practice.KafkaExample$1.run(KafkaExample.java:62)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in secure mode.
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:73)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:80)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:271)
... 2 more
Caused by: java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in secure mode.
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:289)
at org.apache.kafka.common.security.kerberos.Login.(Login.java:104)
at org.apache.kafka.common.security.kerberos.LoginManager.(LoginManager.java:44)
at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55)
... 5 more
i used the code available at https://github.com/CloudKarafka/java-kafka-example replaced the brokers, username, password values with my cloudkarafka details.
following is that program
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays; import java.util.Date; import java.util.Properties;
public class KafkaExample {
}
when i ran it as java application in eclipse, it is showing this in console
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Exception in thread "Thread-0" org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:181)
at practice.KafkaExample$1.run(KafkaExample.java:62)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in secure mode.
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:73)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:80)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:271)
... 2 more
Caused by: java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in secure mode.
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:289)
at org.apache.kafka.common.security.kerberos.Login.(Login.java:104)
at org.apache.kafka.common.security.kerberos.LoginManager.(LoginManager.java:44)
at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55)
... 5 more