lensesio / kafka-topics-ui

Web Tool for Kafka Topics |
https://lenses.io
877 stars 148 forks source link

Kafka "Topic is empty" showing in UI after the producer code compilation successfully #119

Closed TheBhaskarDas closed 6 years ago

TheBhaskarDas commented 6 years ago

I am trying to write and run my own producer code so I have written the code in IntelliJ and compiled successfully with process finished with exit code 0. But there has no message is showing in the UI, it is showing that the topic is empty, meanwhile if I am trying to produce some message using terminal in docker then the topic is behaving properly. Please help me to fix this issue. Thanks in advanced.

capture

Code:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();

        // kafka bootstrap server
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());
        // producer acks
        properties.setProperty("acks", "1");
        properties.setProperty("retries", "3");
        properties.setProperty("linger.ms", "1");

        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);

        for (int key=0; key < 10; key++){
            ProducerRecord<String, String> producerRecord =
        new ProducerRecord<String, String>("second_topic", Integer.toString(key), "message that has key: " + Integer.toString(key));
            producer.send(producerRecord);
        }
        producer.close();
    }
}

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.dataoverflow.kafka</groupId>
    <artifactId>kafka-dataoverflow</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
    </dependencies>

</project>
ValentinTrinque commented 6 years ago

You may want to check the related issues.

andmarios commented 6 years ago

Hi @TheBhaskarDas, your issue is probably a docker/kafka networking one.

I guess you expose port 9092 from fast-data-dev and run your application locally, right?

Please try to add -e ADV_HOST=127.0.0.1 to the docker run command. This way Kafka will advertise its address as localhost and your application should be able to connect to it successfully.

andmarios commented 6 years ago

Oh, a small correction, I noticed you access Kafka at 192.168.99.100. Probably you are on docker on MacOS?

In such case, please try to start fast-data-dev like this:

docker run --net=host -e ADV_HOST=192.168.99.100 landoop/fast-data-dev

Then set your application to look for the broker at 192.168.99.100:9092.

TheBhaskarDas commented 6 years ago

Hi @andmarios If my issue is a docker/kafka networking one then why the Topics are able to load into Kafka Broker from local cmd terminal. Please check the image below. k1

TheBhaskarDas commented 6 years ago

Hi @andmarios I am using Windows 10. I want to show you an update about my code.

package com.dataoverflow.kafka;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();

        // kafka bootstrap server
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("metadata.broker.list", "127.0.0.1:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());
        // producer acks
        properties.setProperty("acks", "1");
        properties.setProperty("timeout.ms", "6000");

        properties.setProperty("retries", "3");
        properties.setProperty("linger.ms", "1");
        //Specify buffer size in config
                                                 //       properties.put("batch.size", 16384); 
                                                 //       properties.put("buffer.memory", 33554432);
        properties.put("metadata.broker.list", "localhost:9092, broker1:9092");

        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);

        for (int key=0; key < 10; key++){
            ProducerRecord<String, String> record =
        new ProducerRecord<String, String>("second_topic", Integer.toString(key), "message that has key: " + Integer.toString(key));
            producer.send(record, new MyProducerCallback());
            System.out.println("AsynchronousProducer call completed");
        }
        producer.close();
    }
}

class MyProducerCallback implements Callback{

    public  void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            System.out.println("AsynchronousProducer failed with an exception");
            System.out.println(e.getStackTrace());
        }
        else
            System.out.println("AsynchronousProducer call Success:");
    }
}

Output Console:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
AsynchronousProducer failed with an exception
[Ljava.lang.StackTraceElement;@445b84c0
AsynchronousProducer call completed
AsynchronousProducer failed with an exception
[Ljava.lang.StackTraceElement;@63d4e2ba
AsynchronousProducer call completed
AsynchronousProducer failed with an exception
[Ljava.lang.StackTraceElement;@7bb11784
AsynchronousProducer call completed
AsynchronousProducer failed with an exception
[Ljava.lang.StackTraceElement;@34033bd0
AsynchronousProducer call completed
AsynchronousProducer failed with an exception
[Ljava.lang.StackTraceElement;@17d99928
AsynchronousProducer call completed

So when I am trying to push message from using external IDE then the topic is not loading into Kafka Broker. Topic gets empty. This is my problem. I have tried from Eclipse and IntelliJ both.

andmarios commented 6 years ago

The reason this works, is that you run it within the fast-data-dev container.

Each docker container has its own network namespace, with its own ip address and hostname. The way kafka works, when your application connects to the broker, the broker sends back its address (an address it autodetected) and the client has to disconnect from the bootstrap server and connect to the new address.

Without -e ADV_HOST=127.0.0.1, the broker returns the container hostname, something like d23d320508fa:9092. This address can only be resolved within the container, so kafka-console-producer can find it, but your application cannot. :)

andmarios commented 6 years ago

This is an error from the docker daemon. Maybe something went wrong with your docker setup? It could be its disk getting full or something.

The same command without the --net=host part, would be: docker run -e ADV_HOST=192.168.99.100 -p 3030:3030 -p 9092:9092 -p 8081-8083:8081-8083 landoop/fast-data-dev

TheBhaskarDas commented 6 years ago

When I am running this command then it is directly(without showing any stacktrace like before) entering the root@fast-data-dev/$ and 192.168.99.100 ping request of 192.168.99.100 is less than 1 second but the Landoop Web UI at 192.168.99.100:3030 is not responding or refused to connect in browser. Docker is running properly but the Web UI is not opening.

C:\Users\Bhaskar Das>docker run --rm -it -p 2181:2181 -p 3030:3030 -p 8081:8081 -p 8082:8082 -p 8083:8083 -p 9092:9092 -e ADV_HOST=192.168.99.100 landoop/fast-data-dev bash
root@fast-data-dev / $
andmarios commented 6 years ago

That way you don't run the docker image as it is setup, but rather the command bash inside it. So kafka, connect, the web UI, etc never start.

You should run the docker image normally:

docker run ... landoop/fast-data-dev

Then exec into it to run commands:

docker ps
docker exec -it [CONTAINER_ID] bash
TheBhaskarDas commented 6 years ago

I am sorry but I have been trying to run this command as you said image

But it has stucked in the last line and I am unable to enter the shell mode(I mean where I will be able to create/list/delete topics).

So my question is how to get the shell mode where I will be able to run kafka-topics..........command?

andmarios commented 6 years ago

This isn't about fast-data-dev, but rather about docker. You would run docker exec... in a new terminal. The normal run command in most docker images, isn't meant to throw you in a shell rather than start the program(s) in the image and keep them running.

TheBhaskarDas commented 6 years ago

Ok. Great! I have done it ! Thanks for your kind help, I have followed the way as you said but it is working when I have changed the properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); to properties.setProperty("bootstrap.servers", "192.168.99.100:9092"); in my code and then the Kafka Topic UI in your Landoop Kafka Web UI is showing the message. The messages are being pushed and are being loaded into Broker and showing in the UI. I am happy now. :) image

Meanwhile, I have noticed One problem, can you please tell me the solution.

image

image

The Error in details: image

andmarios commented 6 years ago

This is not something alarming, probably fails due to ADV_HOST. You can ignore it safely. :)

TheBhaskarDas commented 6 years ago

Thanks!

krishanranditha commented 6 years ago

For Kafka REST Proxy 3.2.x you should set consumer.request.timeout.ms=30000. Without this option, Kafka REST Proxy will fail to return messages for large topics. Add " consumer.request.timeout.ms=30000" to " /etc/kafka-rest/kafka-rest.properties "