sclasen / akka-kafka

185 stars 62 forks source link

StateTimeout after 1 message is processed #33

Closed romangarcia closed 9 years ago

romangarcia commented 9 years ago

I have a simple pipeline using non-batch consumer. I extract the message on ExtractActor, pass it along to TransformActor which generates N possible transformations, and these are passed along to LoadActor which performs the load (an elasticsearch cluster) and once all loads are confirmed, the StreamFSM.Procesed message is sent to original "sender" (the one who sent the message to ExtractActor. I send 1 single message, it gets passed thru all the pipeline - all transformations applied, stored in database, etc - and sends its StreamFSM.Processed message. However, after this, I start receiving StateTimeout warning 1 per second.

The problem is not the warning itself, but the fact that I don't get any more messages consumed after this!

I use 4 partitions of kafka. This is the warning: 2015-03-30 12:15:10,568 WARN com.sclasen.akka.kafka.ConnectorFSM Risk-Events-ETL-akka.actor.default-dispatcher-140 - state=Committing msg=StateTimeout drained=3 streams=4

romangarcia commented 9 years ago

My ConsumerProps:

AkkaConsumerProps.forContext[String, RiskEventMessage](context, zkConnect, topic = cfg.topicName, group = cfg.group, streams = cfg.streams, keyDecoder = new StringDecoder%28%29, msgDecoder = new RiskEventDecoder%28%29, receiver = context.actorOf%28Props%28new EventsExtractActor%28transformers, storage%29%29, cfg.topicName +), commitConfig = CommitConfig(commitInterval = Option(10 seconds), commitTimeout = Timeout(10 seconds)) )

sclasen commented 9 years ago

Hi @romangarcia can you turn on debug logging and send along a more detailed log?

Thanks!

romangarcia commented 9 years ago

Ignore this issue, it was a bug on my side (fyi, I was sending more-than-one Processed per message)

mjuchli commented 8 years ago

I have the same issue. Can you tell more about "more-than-one Processed per message"?

Here's my setup: new AkkaConsumer(AkkaConsumerProps.forSystem( system = system, zkConnect = config.getString("host") + ":" + config.getInt("port"), topic = config.getString("topic"), group = config.getString("group"), streams = config.getInt("streams"), keyDecoder = new DefaultDecoder(), msgDecoder = new AvroMessageDecoderTagMessage, receiver = receiveActor, commitConfig = CommitConfig(commitInterval = Option(10 seconds), commitTimeout = Timeout(10 seconds)) ))

And the log: [INFO] [09/30/2015 10:32:11.324] [ForkJoinPool-2-worker-1] [akka://ClientFilterProducer-system/user/matcher] Matches are: List(551905273a76e9.35011041, 5524e6e97c7999.14011172) DEBUG ClientFilterProducer-system-akka.actor.default-dispatcher-7 - Handling 1 events DEBUG ClientFilterProducer-system-akka.actor.default-dispatcher-7 - Getting broker partition info for topic clientFilter DEBUG ClientFilterProducer-system-akka.actor.default-dispatcher-7 - Partition [clientFilter,0] has leader 0 DEBUG ClientFilterProducer-system-akka.actor.default-dispatcher-7 - Broker partitions registered for topic: clientFilter are 0 DEBUG ClientFilterProducer-system-akka.actor.default-dispatcher-7 - Sending 1 messages with no compression to [clientFilter,0] DEBUG ClientFilterProducer-system-akka.actor.default-dispatcher-7 - Producer sending messages with correlation id 116 for topics [clientFilter,0] to broker 0 on 10.0.1.50:9092 DEBUG ClientFilterProducer-system-akka.actor.default-dispatcher-7 - Producer sent messages with correlation id 116 for topics [clientFilter,0] to broker 0 on 10.0.1.50:9092 DEBUG main-SendThread(10.0.1.50:2181) - Got ping response for sessionid: 0x1501d1702b20026 after 1ms [WARN] [09/30/2015 10:32:12.204] [ClientFilterProducer-system-akka.actor.default-dispatcher-6] [akka://ClientFilterProducer-system/user/$b] state=Committing msg=StateTimeout drained=0 streams=1 [WARN] [09/30/2015 10:32:13.223] [ClientFilterProducer-system-akka.actor.default-dispatcher-11] [akka://ClientFilterProducer-system/user/$b] state=Committing msg=StateTimeout drained=0 streams=1 DEBUG main-SendThread(10.0.1.50:2181) - Got ping response for sessionid: 0x1501d1702b20026 after 0ms [WARN] [09/30/2015 10:32:14.244] [ClientFilterProducer-system-akka.actor.default-dispatcher-11] [akka://ClientFilterProducer-system/user/$b] state=Committing msg=StateTimeout drained=0 streams=1 [WARN] [09/30/2015 10:32:15.265] [ClientFilterProducer-system-akka.actor.default-dispatcher-4] [akka://ClientFilterProducer-system/user/$b] state=Committing msg=StateTimeout drained=0 streams=1 DEBUG main-SendThread(10.0.1.50:2181) - Got ping response for sessionid: 0x1501d1702b20026 after 0ms [INFO] [09/30/2015 10:32:15.858] [ClientFilterProducer-system-akka.actor.default-dispatcher-2] [akka://ClientFilterProducer-system/system/IO-TCP/selectors/$a/2] Message [akka.io.Tcp$Close$] from Actor[akka://ClientFilterProducer-system/user/IO-HTTP/group-0/0#216242784] to Actor[akka://ClientFilterProducer-system/system/IO-TCP/selectors/$a/2#806914612] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [09/30/2015 10:32:15.898] [ClientFilterProducer-system-akka.actor.default-dispatcher-2] [akka://ClientFilterProducer-system/system/IO-TCP/selectors/$a/1] Message [akka.io.Tcp$Close$] from Actor[akka://ClientFilterProducer-system/user/IO-HTTP/group-0/2#748834853] to Actor[akka://ClientFilterProducer-system/system/IO-TCP/selectors/$a/1#131581177] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [09/30/2015 10:32:16.197] [ClientFilterProducer-system-akka.actor.default-dispatcher-12] [akka://ClientFilterProducer-system/system/IO-TCP/selectors/$a/3] Message [akka.io.Tcp$Close$] from Actor[akka://ClientFilterProducer-system/user/IO-HTTP/group-0/3#-667583740] to Actor[akka://ClientFilterProducer-system/system/IO-TCP/selectors/$a/3#1622696826] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [WARN] [09/30/2015 10:32:16.283] [ClientFilterProducer-system-akka.actor.default-dispatcher-12] [akka://ClientFilterProducer-system/user/$b] state=Committing msg=StateTimeout drained=0 streams=1 [INFO] [09/30/2015 10:32:16.324] [ClientFilterProducer-system-akka.actor.default-dispatcher-3] [akka://ClientFilterProducer-system/system/IO-TCP/selectors/$a/0] Message [akka.io.Tcp$Close$] from Actor[akka://ClientFilterProducer-system/user/IO-HTTP/group-0/1#-1568985896] to Actor[akka://ClientFilterProducer-system/system/IO-TCP/selectors/$a/0#1895105001] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [WARN] [09/30/2015 10:32:17.305] [ClientFilterProducer-system-akka.actor.default-dispatcher-7] [akka://ClientFilterProducer-system/user/$b] state=Committing msg=StateTimeout drained=0 streams=1 DEBUG main-SendThread(10.0.1.50:2181) - Got ping response for sessionid: 0x1501d1702b20026 after 0ms

mjuchli commented 8 years ago

@sclasen any idea?

romangarcia commented 8 years ago

@backender My case, I believe, was "telling" the "sender" Actor the message "Processed" multiple times per message. i.e: A message arrived, I did the processing (index multiple documents to an ElasticSearch cluster), and, as per contract, I was sending the Processed message to the sender. This created some sort of inconsistency which made the system fail.

fabiofumarola commented 8 years ago

@backender I got the same issue. I'm sure that the problem, as said by @backender, is related to the fact the a StreamFSM.Processed is send multiple time. What I did is moving the sender ! StreamFSM.Processed at the beginning and saving the failed messages to another kafka topic