danielwegener / logback-kafka-appender

Logback appender for Apache Kafka
Apache License 2.0
640 stars 262 forks source link

Kafka producer is not able to update metadata #44

Closed vajralavenkat closed 7 years ago

vajralavenkat commented 7 years ago

Hi,

I am getting below exception with 3 kafka brokers cluster and 1 zookeeper node. org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 10000 ms.

Kafka Appender config in logback.xml:

<!-- This is the kafkaAppender -->
    <appender name="KAFKA"
        class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <!-- This is the default encoder that encodes every log message to an utf8-encoded 
            string -->
        <encoder
            class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>%d{yyyy-MM-dd HH:mm:ss} </pattern>
            </layout>
        </encoder>
        <topic>mytopic</topic>
        <keyingStrategy
            class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy" />
        <deliveryStrategy
            class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />

        <!-- each <producerConfig> translates to regular kafka-client config (format: 
            key=value) -->
        <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
        <!-- bootstrap.servers is the only mandatory producerConfig -->
        <producerConfig>bootstrap.servers=localhost:9094,localhost:9092,localhost:9093</producerConfig>
        <producerConfig>max.block.ms=10000</producerConfig>
        <producerConfig>reconnect.backoff.ms=100</producerConfig>
        <producerConfig>retry.backoff.ms=100</producerConfig>
        <producerConfig>linger.ms=100</producerConfig>
        <producerConfig>batch.size=2</producerConfig>
        <producerConfig>block.on.buffer.full=false</producerConfig>
        <producerConfig>buffer.memory=2147483647</producerConfig>
    </appender>

producerConfig: {block.on.buffer.full=false, value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer, max.block.ms=10000, reconnect.backoff.ms=100, batch.size=2, bootstrap.servers=localhost:9094,localhost:9092,localhost:9093, retry.backoff.ms=100, buffer.memory=2147483647, key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer, linger.ms=100}

Using Kafka and Kafka client versions 0.10.1.1 in gradle build file: runtime('org.apache.kafka:kafka_2.11:0.10.1.1'){ exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'log4j', module: 'log4j' }
runtime('org.apache.kafka:kafka-clients:0.10.1.1')

Using below classes: import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord;

C:\kafka_2.11-0.10.1.1\bin\windows>kafka-topics.bat --describe --zookeeper localhost:2181/kafka --topic mytopic
Topic:mytopic   PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: mytopic  Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,1,2

Please let me know for any additional details. Any help is highly appreciated.

Regards, Venkat

vajralavenkat commented 7 years ago

Hi,

I have created sample Maven project and getting same issue.

Logs:
2017-02-15 12:15:22 DEBUG [APP: TRACKING_ID: ACCOUNT_NUM:] o.s.c.e.PropertySourcesPropertyResolver  [getProperty 81] - Searching for key 'context.listener.classes' in [commandLineArgs] 
2017-02-15 12:15:22 DEBUG [APP: TRACKING_ID: ACCOUNT_NUM:] o.a.k.clients.producer.KafkaProducer [run 123] - Kafka producer started 
2017-02-15 12:15:22 DEBUG [APP: TRACKING_ID: ACCOUNT_NUM:] org.jboss.logging    [logProvider 160] - Logging Provider: org.jboss.logging.Slf4jLoggerProvider found via system property 
2017-02-15 12:15:32 DEBUG [APP: TRACKING_ID: ACCOUNT_NUM:] o.a.k.clients.producer.KafkaProducer [run 123] - Exception occurred during message send: 
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 10000 ms.
2017-02-15 12:15:32 DEBUG [APP: TRACKING_ID: ACCOUNT_NUM:] o.a.k.clients.producer.KafkaProducer [debug 131] - Exception occurred during message send: 
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 10000 ms.
2017-02-15 12:15:32 DEBUG [APP: TRACKING_ID: ACCOUNT_NUM:] o.a.k.clients.producer.KafkaProducer [invoke0 -2] - Exception occurred during message send: 
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 10000 ms.
2017-02-15 12:15:42 DEBUG [APP: TRACKING_ID: ACCOUNT_NUM:] o.a.k.clients.producer.KafkaProducer [run 123] - Exception occurred during message send: 
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 10000 ms.
2017-02-15 12:15:42 DEBUG [APP: TRACKING_ID: ACCOUNT_NUM:] o.a.k.clients.producer.KafkaProducer [debug 131] - Exception occurred during message send: 
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 10000 ms.
2017-02-15 12:15:42 DEBUG [APP: TRACKING_ID: ACCOUNT_NUM:] o.a.k.clients.producer.KafkaProducer [invoke0 -2] - Exception occurred during message send: 
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 10000 ms.

POM:

<pom>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.comcast.activation.logappender1</groupId>
    <artifactId>kafkaappender1</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>KafkaLogAppender1</name>
    <description>Project for Spring Boot Rest Service</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.3.8.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.github.danielwegener</groupId>
            <artifactId>logback-kafka-appender</artifactId>
            <version>0.1.0</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.6</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.6</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</pom>

Java file:

package com.comcast.activation.logappender1;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
@RestController
@RequestMapping(path = "/KafkaLogger")
public class KafkaLogAppender1Application {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLogAppender1Application.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaLogAppender1Application.class, args);
    }

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    @RequestMapping(path = "/greeting", method = RequestMethod.GET)
    public String sayHello() {
        String responseMsg = "response from micro service.";
        LOGGER.debug(" sayHello(): Hello demoapp1.");
        return "Hello demoapp2: " + responseMsg;
    }

    @RequestMapping(path = "/exception", method = RequestMethod.GET)
    public String sayException() {
        String message = "sayException method excution started";
        LOGGER.debug(" sayException(): Hello demoapp1.");
        throw new RuntimeException(message);
    }
}

Logback Config:

    <appender name="KAFKA"
        class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder
            class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>%d{yyyy-MM-dd HH:mm:ss} </pattern>
            </layout>
        </encoder>
        <topic>mytopic</topic>
        <keyingStrategy
            class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy" />
        <deliveryStrategy
            class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
        <producerConfig>bootstrap.servers=localhost:9094,localhost:9092,localhost:9093</producerConfig>
        <producerConfig>max.block.ms=10000</producerConfig>
        <producerConfig>reconnect.backoff.ms=100</producerConfig>
        <producerConfig>retry.backoff.ms=100</producerConfig>
        <producerConfig>linger.ms=100</producerConfig>
        <producerConfig>batch.size=2</producerConfig>
        <producerConfig>block.on.buffer.full=false</producerConfig>
        <producerConfig>buffer.memory=2147483647</producerConfig>

        <!-- this is the fallback appender if kafka is not available. -->
        <appender-ref ref="STDOUT" />
    </appender>

    <root level="DEBUG">
        <appender-ref ref="KAFKA" />
    </root>

Am i missing any producer config property?

Regards, Venkat

danielwegener commented 7 years ago

Hi Venkat. Sorry, my delay on github is currently a bit high. First thing to try, please lower the root level to INFO (or lower) - this might be related to #36 .

Can you confirm that the kafka cluster is healthy - i.e. can you produce messages using the kafka example producers?

danielwegener commented 7 years ago

Oh and by the way, why are you forcing the kafka-clients version to 0.10.1.1 ? I have no idea if they are api-wise compatible. Even if you are running a 0.10.x cluster, the appender should work fine with its kafka-clients 0.9.0.0 (see https://kafka.apache.org/protocol.html#protocol_compatibility).

danielwegener commented 7 years ago

Possibly related: #41

vajralavenkat commented 7 years ago

Hi Daniel Wegener,

I am able to produce and consume the messages using kafka producer and consumer installation files (.bat examples from kafka installation) without any issue.

I tried with Kafka clients 0.9.00 with kafka 0.10.1.1, when I set the root log level to TRACE or DEBUG, i am getting same error. org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

When I set the root level to INFO or higher, then application is able to produce messages to kafka and able to consume successfully.

Can you please guide me on how to overcome this limitation on using DEBUG or TRACE.

Regards, Venkat

danielwegener commented 7 years ago

You could try to raise the logger level for org.apache.kafka to something to equal or higher higher than INFO (such that o.a.k.* logs only log on >INFO but everything else logs on DEBUG).

vajralavenkat commented 7 years ago

Hi Daniel Wegener,

After setting the logging level of org.apache.kafka to WARN, application is able to send messages to kafka brokers without any issue for DEBUG level.

Thank you so much for immediate assistance.

Regards, Venkat

danielwegener commented 7 years ago

Duplicate of #33

aaquibkhwaja commented 7 years ago

@vajralavenkat where and how did you set this property ? On the client side or the kafka server side ?

danielwegener commented 6 years ago

On the client side: In the logback of the configuration that uses the logback-appender (the kafka producer recursively uses slf4j and hence logback to log its attempts to connect to the broker).

abhiverma23 commented 6 years ago

Hi @danielwegener , I made org.apache.kafka logging level to WARN then I tried with root logging level as "TRACE" i am getting some issue

2018-04-13 11:29:08.303 [xyz-service,,, , ] DEBUG [main] c.s.j.i.DefaultMBeanServerInterceptor - JMX.mbean.registered kafka.producer:type=producer-metrics,client-id=producer-1 2018-04-13 11:29:08.304 [xyz-service,,, , ] DEBUG [main] c.s.j.m.Repository - name = kafka.producer:type=producer-metrics,client-id=producer-1 2018-04-13 11:29:08.305 [xyz-service,,, , ] DEBUG [main] c.s.j.m.Repository - name = kafka.producer:type=producer-metrics,client-id=producer-1 2018-04-13 11:29:08.305 [xyz-service,,, , ] DEBUG [main] c.s.j.m.Repository - name = kafka.producer:type=producer-metrics,client-id=producer-1 2018-04-13 11:29:08.305 [xyz-service,,, , ] DEBUG [main] c.s.j.i.DefaultMBeanServerInterceptor - Send delete notification of object kafka.producer:client-id=producer-1,type=producer-metrics 2018-04-13 11:29:08.306 [xyz-service,,, , ] DEBUG [main] c.s.j.i.DefaultMBeanServerInterceptor - JMX.mbean.unregistered kafka.producer:type=producer-metrics,client-id=producer-1 2018-04-13 11:29:08.306 [xyz-service,,, , ] DEBUG [main] c.s.j.i.DefaultMBeanServerInterceptor - ObjectName = kafka.producer:type=producer-metrics,client-id=producer-1 2018-04-13 11:29:08.307 [xyz-service,,, , ] DEBUG [main] c.s.j.m.Repository - name = kafka.producer:type=producer-metrics,client-id=producer-1 2018-04-13 11:29:08.307 [xyz-service,,, , ] DEBUG [main] c.s.j.i.DefaultMBeanServerInterceptor - Send create notification of object kafka.producer:client-id=producer-1,type=producer-metrics

Above logs are coming for me continuously.

If I am setting Root logging level as DEBUG it is working fine for me.

danielwegener commented 6 years ago

Well there is obviously something used by the kafka producer that logs onto the trace level but is not part of the org.apache.kafka package (something from c.s.j.i.). We can not prevent kafka-producer (and it's called libraries) from logging to itself (this is why you see the continious stream) without completely repackaging (shading) it together with it's dependencies. But this would also mean that we'd need to re-release logback kafka appender for each new producer version (which is why we did it not yet). Therefore, you'll probably need to set the com.sun.jmx.interceptor logger package (and probably others) to DEBUG level too or simply use a lower log level for the root logger (TRACE on root is a bit wild setting anyway).

kelbyloden commented 5 years ago

I just wasted about 3 hours on this before stumbling on this issue blog. Please add the below line to the sample logback.xml configuration file with a quick note on why its needed to save folks some hassle. Thanks!

<logger name="org.apache.kafka" level="warn"/>
kelbyloden commented 5 years ago

Oh and I don't mean to come across as a complainer. This is an excellent solution that you have been nice enough to provide to everyone. Adding that one line with a comment to the example will make it more hassle free when someone changes the log level to debug.

OneCricketeer commented 4 years ago

I don't understand why changing log level fixes the problem.

It would make more sense that you restarted the producer client, and that is what really fixed it