sclasen / akka-kafka

185 stars 62 forks source link

StateTimeout with long processing #31

Closed salex89 closed 9 years ago

salex89 commented 9 years ago

Hello,

I'm using Kafka as a queue for some IO operations in a worker, and I have to assume that the operations could last sometimes up to 30-60 seconds. I've notices I am seeing a lot of StateTimeout messages in the log from akka-kafka then. I tried fiddling with the commit interval/timeout but no real luck. The behaviour changes a bit if I put the StreamFSM.Processed before or after the actual IO, but nevertheless the "error" is the same, just in different moments (this is expected). How to coordinate this situation?

sclasen commented 9 years ago

@salex89 I am assuming you are using the batch connector here?

can you give me an example of the log line you are seeing?

sclasen commented 9 years ago

Also, if it is this line

https://github.com/sclasen/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/BatchActors.scala#L139

Then currently you will get 1 warning per second, which is just a warning, and probably ok..

Feel free to send a PR to make the 1 second interval configurable if you are up for it!

https://github.com/sclasen/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/BatchActors.scala#L137

https://github.com/sclasen/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/BatchActors.scala#L119

https://github.com/sclasen/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/Actors.scala#L147

salex89 commented 9 years ago

Hi, sorry for waiting.

No, I'm not using the batch connector, I'm using the "regular" one. I was in doubt which to use because I didn't find know on which criteria should I decide.

This is the message I am seeing: [WARN] [03/09/2015 10:47:52.702] [mca-worker-akka.actor.default-dispatcher-7] [akka://mca-worker/user/$c] state=Committing msg=StateTimeout drained=0 streams=1

At some point in time after the operation is finished and the sender ! StreamFSM.Processed is sent, the message stops showing. Which is expected in some way, based on the documentation (you wrote that the producer will hang if all messages are not commited). But again, I would like this to be a legal state, because the processing might just take a bit longer. Here is my code sample (I'm storing in Cassandra):

def receive = {
case x: Any =>
  val msg = x.asInstanceOf[MessageAndMetadata[String, String]]
  val topic = msg.topic
  topic match {
    case `activity` =>
      val json = Json.parse(msg.message())
          val builder = QueryBuilder.insertInto(keyspace, "physical_activity")...
          session.execute(builder)
          Thread.sleep(25000); /*simulating a longer write, because this may take longer in other cases. The messages start appearing after about 5 seconds.*/
          sender ! StreamFSM.Processed
        })
      }
  }

}

sclasen commented 9 years ago

Ok, what commit config are you using?

On Monday, March 9, 2015, Aleksandar Stojadinovic notifications@github.com wrote:

Hi, sorry for waiting.

No, I'm not using the batch connector, I'm using the "regular" one. I was in doubt which to use because I didn't find know on which criteria should I decide.

This is the message I am seeing: [WARN] [03/09/2015 10:47:52.702] [mca-worker-akka.actor.default-dispatcher-7] [akka://mca-worker/user/$c] state=Committing msg=StateTimeout drained=0 streams=1

At some point in time after the operation is finished and the sender ! StreamFSM.Processed is sent, the message stops showing. Which is expected in some way, based on the documentation (you wrote that the producer will hang if all messages are not commited). But again, I would like this to be a legal state, because the processing might just take a bit longer. Here is my code sample (I'm storing in Cassandra):

def receive = { case x: Any => val msg = x.asInstanceOf[MessageAndMetadata[String, String]] val topic = msg.topic topic match { case activity => val json = Json.parse(msg.message()) val builder = QueryBuilder.insertInto(keyspace, "physical_activity")... session.execute(builder) Thread.sleep(25000); /simulating a longer write, because this may take longer in other cases. The messages start appearing after about 5 seconds./ sender ! StreamFSM.Processed }) } }

}

— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/31#issuecomment-77825803.

salex89 commented 9 years ago

I'm using the default one, which I believe is: commitConfig = CommitConfig(Some(5 seconds), Some(10000), 5 seconds) .

When I increase it, the message shows up later (again, expected, because the consumer checks more seldom). However, here what happens (if the commit interval is 30 seconds, and the work takes 25).

Three messages consumed -> first one is processed -> second is started being processed. A commit time comes: First is committed, second one is still being processed -> the notification appears for messages 2 and 3. A commit time comes again: Msg 2. is already processed and msg. 3 process has started -> Msg 2 is commited. Now the error appears only for Msg 3.

If you consider this benign, I'll just let it pass. I'm more "worried" how will it affect the Kafka queue and what will happen if I would have (but not likely in this case) more consumers in the same group.

Also, this kinda introduces some latency, because new messages are flowed in in 30 second intervals, so the latency is something around 15 seconds (average) + processing time.

Should I switch to the batch consumer?

sclasen commented 9 years ago

The log warnings are benign, if they eventually stop. They are just there to say 'hey Im still trying to commit', which ususally should be a short interval. Though not in your case.

Perhaps you dont want to do commit after an interval, but instead after just some number of messages?

CommitConfig(None, Some(numOfMsgs), 5 seconds)

Seems like since your processing takes a long time that you dont need to commit as frequently (or at all based on time).

Note that the numOfMsgs is not a hard number, the connector will start the commit process after numOfMsgs but a few more will likely be processed (and committed properly).

If you can deal with a batch of messages, rather than single ones, batch might make sense. Basically the difference is that you will get sent a Seq[MyMessageType] instead of just MyMessageType, and respond to the sender with a BatchConnectorFSM.BatchProcessed

On Mon, Mar 9, 2015 at 7:39 AM, Aleksandar Stojadinovic < notifications@github.com> wrote:

I'm using the default one, which I believe is: commitConfig = CommitConfig(Some(5 seconds), Some(10000), 5 seconds) .

When I increase it, the message shows up later (again, expected, because the consumer checks more seldom). However, here what happens (if the commit interval is 30 seconds, and the work takes 25).

Three messages consumed -> first one is processed -> second is started being processed. A commit time comes: First is committed, second one is still being processed -> the notification appears for messages 2 and 3. A commit time comes again: Msg 2. is already processed and msg. 3 process has started -> Msg 2 is commited. Now the error appears only for Msg 3.

If you consider this benign, I'll just let it pass. I'm more "worried" how will it affect the Kafka queue and what will happen if I would have (but not likely in this case) more consumers in the same group.

Also, this kinda introduces some latency, because new messages are flowed in in 30 second intervals, so the latency is something around 15 seconds (average) + processing time.

Should I switch to the batch consumer?

— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/31#issuecomment-77865095.

salex89 commented 9 years ago

Ok, thank you very much :+1: . Looks like everything is ok, I'll play a bit with the numbers and see what happens.

Btw. I really like the project, I'll keep an eye on it.