LGouellec / streamiz

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/streamiz/
MIT License
462 stars 74 forks source link

Operation not valid in state Ready #32

Closed koshdim closed 4 years ago

koshdim commented 4 years ago

Description

I created an application to try your package, and from time to time I get this exception: Streamiz.Kafka.Net.Errors.StreamsException: 'Operation not valid in state Ready' here is the log that might be helful:

170579 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-2 since 100,0099ms has elapsed (commit interval is 100ms)
170583 [data-ingestion-appkstreamtestin-stream-thread-1] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-1 since 100,0081ms has elapsed (commit interval is 100ms)
170586 [data-ingestion-appkstreamtestin-stream-thread-4] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0041ms has elapsed (commit interval is 100ms)
170586 [data-ingestion-appkstreamtestin-stream-thread-3] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0632ms has elapsed (commit interval is 100ms)
170588 [data-ingestion-appkstreamtestin-stream-thread-5] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0215ms has elapsed (commit interval is 100ms)
170609 [data-ingestion-appkstreamtestin-stream-thread-7] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0051ms has elapsed (commit interval is 100ms)
170615 [data-ingestion-appkstreamtestin-stream-thread-4] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-4] State transition from RUNNING to PARTITIONS_REVOKED
170617 [data-ingestion-appkstreamtestin-stream-thread-3] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-3] State transition from RUNNING to PARTITIONS_REVOKED
170617 [data-ingestion-appkstreamtestin-stream-thread-6] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 101,8294ms has elapsed (commit interval is 100ms)
170617 [data-ingestion-appkstreamtestin-stream-thread-7] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-7] State transition from RUNNING to PARTITIONS_REVOKED
170618 [data-ingestion-appkstreamtestin-stream-thread-4] INFO Streamiz.Kafka.Net.KafkaStream - stream-application[data-ingestion-appKStreamTestIn] State transition from RUNNING to REBALANCING
170620 [data-ingestion-appkstreamtestin-stream-thread-4] INFO Streamiz.Kafka.Net.Kafka.Internal.StreamsRebalanceListener - Partition revocation took 00:00:00.0052987 ms
        Current suspended active tasks:

170620 [data-ingestion-appkstreamtestin-stream-thread-3] INFO Streamiz.Kafka.Net.Kafka.Internal.StreamsRebalanceListener - Partition revocation took 00:00:00.0053174 ms
        Current suspended active tasks:

170620 [data-ingestion-appkstreamtestin-stream-thread-7] INFO Streamiz.Kafka.Net.Kafka.Internal.StreamsRebalanceListener - Partition revocation took 00:00:00.0025206 ms
        Current suspended active tasks:

170640 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-0 since 100,0111ms has elapsed (commit interval is 100ms)
170686 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-2 since 100,0122ms has elapsed (commit interval is 100ms)
170687 [data-ingestion-appkstreamtestin-stream-thread-1] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-1 since 100,007ms has elapsed (commit interval is 100ms)
170690 [data-ingestion-appkstreamtestin-stream-thread-5] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,004ms has elapsed (commit interval is 100ms)
170719 [data-ingestion-appkstreamtestin-stream-thread-6] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0039ms has elapsed (commit interval is 100ms)
170754 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-0 since 100,0124ms has elapsed (commit interval is 100ms)
170787 [data-ingestion-appkstreamtestin-stream-thread-1] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-1 since 100,0117ms has elapsed (commit interval is 100ms)
170787 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-2 since 100,3638ms has elapsed (commit interval is 100ms)
170795 [data-ingestion-appkstreamtestin-stream-thread-5] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0073ms has elapsed (commit interval is 100ms)
170808 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|2] Suspending
170814 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|2] Comitting
170820 [data-ingestion-appkstreamtestin-stream-thread-6] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,005ms has elapsed (commit interval is 100ms)
170822 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager - stream-task[0|2] Flushing all stores registered in the state manager
170835 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Kafka.Internal.RecordCollector - stream-task[0|2] Flusing producer
170841 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.Internal.RecordQueue - stream-task[0|2] - recordQueue [record-queue-KStreamTestIn-0-2]  cleared !
170841 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Kafka.Internal.RecordCollector - stream-task[0|2] Closing producer
170861 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-0 since 100,0118ms has elapsed (commit interval is 100ms)
170891 [data-ingestion-appkstreamtestin-stream-thread-1] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-1 since 100,0116ms has elapsed (commit interval is 100ms)
170904 [data-ingestion-appkstreamtestin-stream-thread-5] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,005ms has elapsed (commit interval is 100ms)
170918 [data-ingestion-appkstreamtestin-stream-thread-2] ERROR Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-2] Encountered the following unexpected Kafka exception during processing, tis usually indicate Streams internal errors:
Confluent.Kafka.KafkaException: Operation not valid in state Ready
   at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
   at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
   at Streamiz.Kafka.Net.Crosscutting.KafkaExtensions.ConsumeRecords[K,V](IConsumer`2 consumer, TimeSpan timeout)
   at Streamiz.Kafka.Net.Processors.StreamThread.Run()
170968 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-0 since 100,0092ms has elapsed (commit interval is 100ms)
170925 [data-ingestion-appkstreamtestin-stream-thread-6] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,7812ms has elapsed (commit interval is 100ms)
170996 [data-ingestion-appkstreamtestin-stream-thread-1] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-1 since 100,0136ms has elapsed (commit interval is 100ms)
171005 [data-ingestion-appkstreamtestin-stream-thread-5] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0077ms has elapsed (commit interval is 100ms)
171010 [data-ingestion-appkstreamtestin-stream-thread-2] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-2] Shutting down
171010 [data-ingestion-appkstreamtestin-stream-thread-2] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-2] State transition from RUNNING to PENDING_SHUTDOWN
171012 [data-ingestion-appkstreamtestin-stream-thread-2] INFO Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|2] Closing
171012 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|2] Suspending
171013 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|2] Comitting
171013 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager - stream-task[0|2] Flushing all stores registered in the state manager
171013 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Kafka.Internal.RecordCollector - stream-task[0|2] Flusing producer
171038 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.Internal.RecordQueue - stream-task[0|2] - recordQueue [record-queue-KStreamTestIn-0-2]  cleared !
171042 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Kafka.Internal.RecordCollector - stream-task[0|2] Closing producer
171042 [data-ingestion-appkstreamtestin-stream-thread-2] ERROR Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-2] Failed to close stream thread due to the following error:
System.NullReferenceException: Object reference not set to an instance of an object.
   at Streamiz.Kafka.Net.Processors.StreamTask.Commit(Boolean startNewTransaction)
   at Streamiz.Kafka.Net.Processors.StreamTask.Suspend()
   at Streamiz.Kafka.Net.Processors.StreamTask.Close()
   at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Close()
   at Streamiz.Kafka.Net.Processors.StreamThread.Close(Boolean cleanUp)

How to reproduce

I don't know how to reproduce, but occurs fairly regularly. my config

var config = new StreamConfig<StringSerDes, StringSerDes>
            {
                ApplicationId = "my-app" ,
                BootstrapServers = _bootstrapServers,
                NumStreamThreads = 2,
                Guarantee = ProcessingGuarantee.EXACTLY_ONCE,
                MessageSendMaxRetries = 100,
            };

Checklist

Please provide the following information:

LGouellec commented 4 years ago

Hi @koshdim ,

You configure your NumStreamThreads to 2, but in your logs I saw more threads : (Minimum) data-ingestion-appkstreamtestin-stream-thread-1 data-ingestion-appkstreamtestin-stream-thread-4 data-ingestion-appkstreamtestin-stream-thread-3 data-ingestion-appkstreamtestin-stream-thread-5 data-ingestion-appkstreamtestin-stream-thread-7 data-ingestion-appkstreamtestin-stream-thread-6 data-ingestion-appkstreamtestin-stream-thread-0 data-ingestion-appkstreamtestin-stream-thread-2

I saw also, you have a rebalacing just before the exception. Is-it normal that you have a rebalancing ? Did you change the number of partitions of a topic or launch another Kafka Streams application?

Thanks.

koshdim commented 4 years ago

oh, sorry, I posted config after I changed threads to 2, but I was able to reproduce with 2 as well. Number of partitions didn't change. I was experimenting with running two streams at once in the same application (different ApplicationId), and stopping/running this application multiple times. do Kafka Streams conflict somehow if run on the same physical machine?

LGouellec commented 4 years ago

You have .NET application which contains two instances of KafkaStreams. Each one has an unique applicationId. Is it correct ? You alternate stopping/running this application to test resilience.

Are you call stream.Close() to each instance when your application is stopping ?

koshdim commented 4 years ago

Are you call stream.Close() to each instance when your application is stopping ?

not always, sometimes it crashed, sometimes I stopped debugging. could there be some leftovers after application is closed? if yes, how do I cleanup?

LGouellec commented 4 years ago

It’s probably this. I have to change public Kafka streams API and especially close method.

In production, stopping stream properly is mandatory.

koshdim commented 4 years ago

how to close it properly if application crashed? I assumed everything is wiped out when relevant process dies

LGouellec commented 4 years ago

Normally yes, if process dies all resources will be free. Some good rules : 1 - surround you Kafka stream instance with a try catch and call Close in finally section 2 - When you reproduce, is it possible to dump memory process and sent in attachment of this issue. Maybe I could analyze it and fix it.

koshdim commented 4 years ago

hi @LGouellec,

  1. what exactly should I surround with try catch? stream.Start() call is non blocking
  2. tried to reproduce it again and got several other exceptions. I'm sorry, but will cease the investigation of Streams and try to achieve what I do with Kafka transactions. In case it might be interesting for you here some info about exceptions: a)

    
    Streamiz.Kafka.Net.Errors.StreamsException
    HResult=0x80131500
    Message=Collection was modified; enumeration operation may not execute.
    Source=Streamiz.Kafka.Net
    StackTrace:
    at Streamiz.Kafka.Net.Processors.StreamThread.Run()
    at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
    at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
    at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
    at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
    at System.Threading.ThreadHelper.ThreadStart()
    
    This exception was originally thrown at this call stack:
    [External Code]

Inner Exception 1: InvalidOperationException: Collection was modified; enumeration operation may not execute.

b) 

Streamiz.Kafka.Net.Errors.StreamsException HResult=0x80131500 Message=Operation not valid in state FatalError Source=Streamiz.Kafka.Net StackTrace: at Streamiz.Kafka.Net.Processors.StreamThread.Run() at System.Threading.ThreadHelper.ThreadStart_Context(Object state) at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) at System.Threading.ThreadHelper.ThreadStart()

This exception was originally thrown at this call stack: [External Code]

Inner Exception 1: KafkaException: Operation not valid in state FatalError

and debug log of b)

case: 16337 [27] DEBUG Streamiz.Kafka.Net.Kafka.Internal.KafkaLoggerAdapter - Log producer data-ingestion-appkstreamtestin-stream-thread-0-0-0-producer#producer-5 - [thrd:main]: TxnCoordinator/3: Failed to add partition "KStreamTestOut" [2] to transaction: Broker: Producer attempted an operation with an old epoch 16339 [27] DEBUG Streamiz.Kafka.Net.Kafka.Internal.KafkaLoggerAdapter - Log producer data-ingestion-appkstreamtestin-stream-thread-0-0-0-producer#producer-5 - [thrd:main]: Fatal transaction error: Failed to add partitions to transaction: Broker: Producer attempted an operation with an old epoch (INVALID_PRODUCER_EPOCH) 16348 [27] DEBUG Streamiz.Kafka.Net.Kafka.Internal.KafkaLoggerAdapter - Log producer data-ingestion-appkstreamtestin-stream-thread-0-0-0-producer#producer-5 - [thrd:main]: Fatal error: Broker: Producer attempted an operation with an old epoch: Failed to add partitions to transaction: Broker: Producer attempted an operation with an old epoch 16352 [25] ERROR Streamiz.Kafka.Net.Kafka.Internal.KafkaLoggerAdapter - Error producer data-ingestion-appkstreamtestin-stream-thread-0-0-0-producer#producer-5 - Failed to add partitions to transaction: Broker: Producer attempted an operation with an old epoch 16355 [data-ingestion-appkstreamtestin-stream-thread-0] ERROR Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-0] Encountered the following unexpected Kafka exception during processing, tis usually indicate Streams internal errors: Confluent.Kafka.KafkaException: Operation not valid in state FatalError at Confluent.Kafka.Impl.SafeKafkaHandle.SendOffsetsToTransaction(IEnumerable1 offsets, IConsumerGroupMetadata groupMetadata, Int32 millisecondsTimeout) at Confluent.Kafka.Producer2.SendOffsetsToTransaction(IEnumerable1 offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout) at Streamiz.Kafka.Net.Processors.StreamTask.Commit(Boolean startNewTransaction) at Streamiz.Kafka.Net.Processors.StreamTask.Commit() at Streamiz.Kafka.Net.Processors.StreamThread.Run() 16408 [data-ingestion-appkstreamtestin-stream-thread-0] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-0] Shutting down 16409 [data-ingestion-appkstreamtestin-stream-thread-0] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-0] State transition from RUNNING to PENDING_SHUTDOWN 16411 [data-ingestion-appkstreamtestin-stream-thread-0] INFO Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|0] Closing 16411 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|0] Suspending 16411 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|0] Comitting 16412 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager - stream-task[0|0] Flushing all stores registered in the state manager 16412 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Kafka.Internal.RecordCollector - stream-task[0|0] Flusing producer 16415 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Processors.Internal.RecordQueue - stream-task[0|0] - recordQueue [record-queue-KStreamTestIn-0-0] cleared ! 16415 [data-ingestion-appkstreamtestin-stream-thread-0] ERROR Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-0] Failed to close stream thread due to the following error: Confluent.Kafka.KafkaException: Operation not valid in state FatalError at Confluent.Kafka.Impl.SafeKafkaHandle.AbortTransaction(Int32 millisecondsTimeout) at Confluent.Kafka.Producer2.AbortTransaction(TimeSpan timeout) at Streamiz.Kafka.Net.Processors.StreamTask.Suspend() at Streamiz.Kafka.Net.Processors.StreamTask.Close() at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Close() at Streamiz.Kafka.Net.Processors.StreamThread.Close(Boolean cleanUp)