LGouellec / streamiz

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/streamiz/
MIT License
470 stars 75 forks source link

High cpu consumption if kafka is down #87

Closed lichutin-st closed 2 years ago

lichutin-st commented 3 years ago

Description

I noticed that stream app consumes a lot of cpu if kafka is not accessible. It looks like confluent consumer ignores "timeout" parameter after throwing first "KafkaException" and returns 0 records immediately. That causes an infinite loop without any pauses.

image

I experimented a bit and it seems that upgrading "confluent-kafka-dotnet" package to the latest version changes the situation, but not very significantly. Instead of immediately raising CPU consumption we get the same effect in 3-5-10 minutes.

NuGet version: 1.1.5. Kafka version: latest confluent kafka

How to reproduce

1) Start kafka 2) Start application 3) Stop kafka 4) Wait a bit

I believe it should have the same behavior for any topology

builder.Stream("test-input", new StringSerDes(), new StringSerDes())
                .To<StringSerDes, StringSerDes>("test-output");

Configuration:

            var streamConfig = new StreamConfig {
                ApplicationId = "someid",
                BootstrapServers = "localhost:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                SchemaRegistryUrl = "http://localhost:8081",
                AutoRegisterSchemas = true
            };

            streamConfig.AddConsumerConfig("allow.auto.create.topics", "true");

            streamConfig.InnerExceptionHandler = (ex) => {
                logger.Error("Exception inside Kafka Streams", ex);
                return ExceptionHandlerResponse.CONTINUE;
            };

            streamConfig.DeserializationExceptionHandler = (context, consumed, ex) => {
                logger.Error("Exception at deserialization inside Kafka Streams", ex);
                return ExceptionHandlerResponse.FAIL;
            };

            streamConfig.ProductionExceptionHandler = (delivery) => {
                logger.Error("Exception at producing inside Kafka Streams");
                return ExceptionHandlerResponse.FAIL;
            };

Checklist

Please provide the following information:

LGouellec commented 3 years ago

Hi @lichutin-st, I will try to reproduce in local environment and create a fix around this issue. Thanks :)