DDTH / ddth-kafka

DDTH's Apache Kafka Libraries and Utilities
MIT License
9 stars 4 forks source link

org.apache.kafka.common.utils.Utils.newThread(Ljava/lang/String;Ljava/lang/Runnable;Ljava/lang/Boolean;)Ljava/lang/Thread; #6

Open taufiquealam007 opened 5 years ago

taufiquealam007 commented 5 years ago

Hi,

I am not able to call kafka broker from kafka consumer in java.

    public SimpleCounterKafkaFileConsumer(){
        Properties properties = new Properties();

// properties.put("bootstrap.servers","localhost:9092"); properties.put("zookeeper.connect","localhost:2181"); properties.put("group.id","test-consumer-group"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); ConsumerConfig consumerConfig = new ConsumerConfig(properties); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); }

If I replace zookeeper.connect then it says java.lang.IllegalArgumentException: requirement failed: Missing required property 'zookeeper.connect'

Please suggest how to get connection with Kafka broker.

btnguyen2k commented 5 years ago

Hi @taufiquealam007, which version of Kafka server and Kafka lib did you use?

taufiquealam007 commented 5 years ago

Hi,

Its Kafka_2.10-0.8.2.1.

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.6</version>
    </dependency>

At this line, It is giving issue. consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

btnguyen2k commented 5 years ago

Hi @taufiquealam007, Kafka_2.10-0.8.2.1 is an old (pretty old) version of Kafka client. The new client is pure Java and called kafka-clients. However, you are not required to specify the kafka-clients dependency because it's already defined in ddth-kafka's POM file (https://github.com/DDTH/ddth-kafka/blob/master/pom.xml), unless you are using a newer version of kafka-clients.

You may want to see the example in README file: https://github.com/DDTH/ddth-kafka/blob/master/README.md

Regards, Thanh Nguyen