LGouellec / streamiz

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/streamiz/
MIT License
460 stars 74 forks source link

StreamsException : Local: Erroneous state #376

Open rao-mayur opened 4 hours ago

rao-mayur commented 4 hours ago

Description

We encountered an exception Local: Erroneous state while processing. Although there is stack trace, there is no specific information indicating what may have caused this. Happens when the processor is trying to produce messages to the topic. We have InnerExceptionHandler to CONTINUE. So the stream thread restarts and we can see the container continuously restart for a while making it unstable for some time. After some time this error goes away and the processor becomes stable.

Wanted to reach out to you and ask if you have any insights on this? What maybe causing the stream thread to encounter Local: Erroneous state exception? Any other config setting to try or things to look at will be helpful.

Here is the exception stack trace : { "error.class": "Streamiz.Kafka.Net.Errors.StreamsException", "error.message": "Error encountered trying to send record to topic topicName-changelog [stream-task[2|0] ] : Local: Erroneous state", "error.stack": "\t at Streamiz.Kafka.Net.Kafka.Internal.RecordCollector.SendInternal[K,V](String topic, K key, V value, Headers headers, Nullable1 partition, Int64 timestamp, ISerDes1 keySerializer, ISerDes1 valueSerializer) \n\t at Streamiz.Kafka.Net.Kafka.Internal.RecordCollector.Send[K,V](String topic, K key, V value, Headers headers, Int32 partition, Int64 timestamp, ISerDes1 keySerializer, ISerDes1 valueSerializer) \n\t at Streamiz.Kafka.Net.ProcessorContext.Log(String storeName, Bytes key, Byte[] value, Int64 timestamp) \n\t at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action actionToMeasure, Sensor sensor) \n\t at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore2.Put(K key, V value) \n\t at Streamiz.Kafka.Net.Processors.KStreamReduceProcessor2.Process(K key, V value) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(IEnumerable1 processors, Action1 action) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward(K key, V value) \n\t at Streamiz.Kafka.Net.Processors.SourceProcessor2.Process(K key, V value) \n\t at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(ConsumeResult2 record) \n\t at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) \n\t at Streamiz.Kafka.Net.Processors.StreamTask.Process() \n\t at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now)", "level": "ERROR", "message": "Failed to process stream task 2-0 due to the following error:", }

How to reproduce

This happens in a topology that uses ForEachAsync() and we perform grpc calls inside. Here is a sample :

public void TestTopology()
{
    var streamBuilder = new StreamBuilder();
    var testStream = streamBuilder.Stream<string, string>("test-topic");
    testStream.ForeachAsync(async (record, _) =>
    {
        var value = JsonConvert.DeserializeObject(record.Value);

        // perform grpc call with value
    }, null, null, "testForEach");
}

Config :

var streamConfig = new StreamConfig<StringSerDes, StringSerDes>
{
    ApplicationId = "id",
    BootstrapServers = "broker",
    SchemaRegistryUrl = "SchemaRegistryUrl",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    AutoRegisterSchemas = false,
    UseLatestVersion = true,
    InnerExceptionHandler = (_) =>
    {
        return ExceptionHandlerResponse.CONTINUE;
    }
};

Checklist

Please provide the following information:

LGouellec commented 3 hours ago

Hey @rao-mayur ,

Can you provide me your LKC in Confluent Cloud, in the meantime can you enable Librdkafka Debug logging to understand why the internal kafka produce raise a "Local: Erroneous state" ?

config.Debug = "broker,topic,msg";
config.Logger = LoggerFactory.Create((b) =>
                {
                    b.SetMinimumLevel(LogLevel.Debug);
                    b.AddConsole();
                })

Thanks