sclasen / akka-kafka

185 stars 62 forks source link

Race issues with StreamFSM when sending a batch of Processed messages #10

Closed jbweeks closed 10 years ago

jbweeks commented 10 years ago

Most of what we do with Akka-Kafka involves many thousands of messages a second, and these messages are sorted into categories, and processed in bulk into various backend sinks.

We started to work with the back-pressure support in akka-kafka and rather than just return with a StreamFSM.Processed acknowledgement immediately, we started to tie these to the success of the backend batch operations.

However, with batch sizes of over around 10 messages (although we typically deal with batches on the backend of size 20-50k), we start to see the behavior get "racy", e.g. occasionally seeing:

WARN ...connector] unhandled event Processed in state Committing

The higher the batch size, the more likely we see this error, which eventually leads to consumer failure due to unhandled acknowledgements.

(We have an Akka method that, on success, takes the batch size as an argument and then turns around and does this:)

   case AckMessage(count) =>
      val connector = consumer.connector
      logger.debug(s"In AckMessage, sending $count acks back to: $connector")
      for (i <- 1 to count) connector ! StreamFSM.Processed

I tried playing with different settings for maxInFlightPerStream, but that didn't seem to help...

Perhaps this could be solved by adding an Int or Long argument to the StreamFSM.Processed message like I did for the relay above?

sclasen commented 10 years ago

@jbweeks that looks a bit unexpected, are you sending the StreamFSM.Processed to the connector? Thats what it looks like in your code sample. That would explain the unexpected log messages too.

Each message needs to be acked to the sender (which is one of the StreamFSM actors underneath the connector).

The individual messages dont need to be tracked in your case, but you would need to track the number of messages sent by each stream and then send a corresponding number of processed back.

Something along the lines of this could be the receiver actorRef provided to your consumer props?

class BatcherReceiver extends Actor{

  val outstanding = new mutable.HashMap[ActorRef, Int]()
  var backend:ActorRef = _

  case object AckBatch

  override def receive = {
    case AckBatch => 
     outstanding.foreach{
      case (sender, count) =>
        (1 to count).foreach(_ => sender ! StreamFSM.Processed)
      }
      outstanding.clear()
    case x:Any =>
      backend ! x
      val out = outstanding.getOrElseUpdate(sender(), 0)
      outstanding += sender() -> (out +1)
  }
}
jbweeks commented 10 years ago

Whoops. Sorry about sending to the connector. D'oh!

Anyway, I ran a number of tests. The good news is I am not getting the FSM errors anymore. The bad news is they have been replaced with:

...1.connector] state=Committing msg=StateTimeout drained=0 streams=1

...with the same ultimate failure case where I get no more messages from the framework. In my testing this afternoon, I added log messages matching the inputs and outputs with counts, etc.

If I reply one-by-one with StreamFSM.Processed, then everything is fine. If I delay a bit, and loop through in a batch, then it devolves. My batch code is pretty simple (x._2 is the ActorRef of the sender):

  var c = 0
  batch.foreach(x => {
    c += 1
    logger.info(s"sending #$c ack message to sender: ${x._2}")
    x._2 ! StreamFSM.Processed
  })

This produces, e.g.:

...sending #200 ack message to sender: Actor[akka://app.1.connector/stream0#1075936793]

I have maxInFlightPerStream set to 20,000, but I seem to see failures far before that. I also simplified the test case to a single stream and connector. I also set the CommitConfig->commitTimeout to 60 seconds.

If nothing jumps out at you, let me know, then I guess I will try to put together a simplified test case demonstrating the behavior.

sclasen commented 10 years ago

@jbweeks can you try enabling debug logging (akka.log-level = DEBUG) ? and let me know what you are seeing? You should see output like below, so we can have a better idea of the state of the actors.

Also can you try to use the commitConfig here.

CommitConfig(commitInterval = None,
                        commitAfterMsgCount = Some(<batch size>),
                        commitTimeout:Timeout = Timeout(60 seconds))
patcher-10] [akka://test/user/testFSM] state=Receiving msg=Received uncommitted=797
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 state=Processing msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-3] [akka://test/user/testFSM/stream1] stream=stream1 state=Full msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 state=Processing msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-3] [akka://test/user/testFSM/stream1] stream=stream1 state=Full msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-10] [akka://test/user/testFSM] state=Receiving msg=Received uncommitted=798
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-3] [akka://test/user/testFSM/stream1] stream=stream1 state=Full msg=Processed outstanding=63
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 at=transition from=Processing to=Full
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-3] [akka://test/user/testFSM/stream1] stream=stream1 at=transition from=Full to=Processing
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 state=Full msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 state=Full msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 state=Full msg=Processed outstanding=63
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 at=transition from=Full to=Processing
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 state=Processing msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 state=Processing msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-5] [akka://test/user/testFSM] state=Receiving msg=Received uncommitted=799
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 at=transition from=Processing to=Full
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 state=Full msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 state=Full msg=Processed outstanding=63
[DEBUG] [07/23/2014 18:10:59.230] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 at=transition from=Full to=Processing
[DEBUG] [07/23/2014 18:10:59.231] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 state=Processing msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.231] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 state=Processing msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.231] [test-akka.actor.default-dispatcher-7] [akka://test/user/testFSM] state=Receiving msg=Received uncommitted=800
[DEBUG] [07/23/2014 18:10:59.231] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 at=transition from=Processing to=Full
[DEBUG] [07/23/2014 18:10:59.231] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 state=Full msg=Processed outstanding=63
[DEBUG] [07/23/2014 18:10:59.231] [test-akka.actor.default-dispatcher-9] [akka://test/user/testFSM/stream0] stream=stream0 at=transition from=Full to=Processing
[DEBUG] [07/23/2014 18:10:59.231] [test-akka.actor.default-dispatcher-14] [akka://test/user/testFSM/stream1] stream=stream1 state=Processing msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.231] [test-akka.actor.default-dispatcher-14] [akka://test/user/testFSM/stream1] stream=stream1 state=Processing msg=Continue outstanding=64
[DEBUG] [07/23/2014 18:10:59.231] [test-akka.actor.default-dispatcher-14] [akka://test/user/testFSM/stream1] stream=stream1 at=transition from=Processing to=Full
jbweeks commented 10 years ago

Ok, sent you debug output out of band. I see a strong correlation to the value of "consumer.timeout.ms" -- if I tweak that up or down, I can get immediate failure modes.

I am using your suggested config (above):

val commitConfig = CommitConfig(commitInterval = None,
                              commitAfterMsgCount = Some(settings.akkaKafkaBatchSize),
                              commitTimeout = Timeout(60 seconds))

(where akkaKafkaBatchSize is set to 400)

sclasen commented 10 years ago

Aha interesting, I think I have a good idea as to whats happening.

So the commitAfterMsgCount is (currently) a soft number. The connector counts the total received messages across the streams, and when we pass commitAfterMsgCount, it issues a commit message.

Concurrently, however, the streams could be receiving messages, and based on how your actor throughput is configured, a number of messages past commitAfterMsgCount may be delivered to your receiver.

So I think your next batch has 5 messages sitting in it, that account for the un-acked 5 messages in your debug output.

I think you will have more precise control if you just use

val commitConfig = CommitConfig(commitInterval = None,
                              commitAfterMsgCount = None,
                              commitTimeout = Timeout(60 seconds))

and manually call connector.commit()

I'll think about if there is a better way longer term to accommodate this usage.

jbweeks commented 10 years ago

Great, I'll give the manual connector.commit() a shot, thanks.

Is there a possible way to be able to send a virtual "mark" to the components handling the incoming stream at the start of my batch processing? If I understand correctly, there will be a set of messages that will be incoming while the batch is being processed, and those will be marked as committed in the kafka partition offset under this new approach, regardless of whether they get processed correctly, no? Ideally, the commit() would be for only processed messages (messages that are part of the batch, not including new messages since the batch was starting processing).

Clear as mud?

sclasen commented 10 years ago

One thing that might be helpful here is is we could accommodate a usage like this

//in AkkaConusmer
def commit(onDraining: => Unit):Future[Unit]

then for manually controlled commits for batching of this style

def whenDraining(){
     batch.save()
}

//in your receiver after receiving batchSize messages
connector.commit(whenDraining())

after calling commit, your batch may still get a few more messages, but once the callback is fired, it wont get any more, and you can run your batch, then send all the acks back and continue. I think this wouldn't be too bad to implement.

jbweeks commented 10 years ago

I assume you mean consumer.commit() not connector.commit()? I don't see a connector.commit() method.

Per your direction, I am now using:

val commitConfig = CommitConfig(commitInterval = None,
                              commitAfterMsgCount = None,
                              commitTimeout = Timeout(60 seconds))

Calling commit() on the consumer prior to sending the FSM.StreamProcessed message works for a while, and then i got the following, after which things failed again:

[akka://clickstream/user/prod.log.clickstream.1.connector] unhandled event Commit in state Committing

See larger context below:

[DEBUG] [07/24/2014 11:28:52.403] [clickstream-akka.actor.default-dispatcher-5] [akka://clickstream/user/prod.log.clickstream.1.connector/stream0] stream=stream0 state=Draining msg=Processed outstanding=392
[DEBUG] [07/24/2014 11:28:52.403] [clickstream-akka.actor.default-dispatcher-5] [akka://clickstream/user/prod.log.clickstream.1.connector/stream0] stream=stream0 state=Draining msg=Processed outstanding=391
14-07-24 11:28:52.420 [clickstream-akka.actor.default-dispatcher-8] INFO  com.ait.clickstream.DB - Batch size 200 persisted successfully, in 17 millis
14-07-24 11:28:52.421 [clickstream-akka.actor.default-dispatcher-8] INFO  com.ait.clickstream.DB - ackMap has size: 1
14-07-24 11:28:52.421 [clickstream-akka.actor.default-dispatcher-2] INFO  com.ait.logging.LogProcessor - In AckMessage, called consumer.commit() and sending 200 acks back to: Actor[akka://clickstream/user/prod.log.clickstream.1.connector/stream0#1612390218]
[WARN] [07/24/2014 11:28:52.421] [clickstream-akka.actor.default-dispatcher-5] [akka://clickstream/user/prod.log.clickstream.1.connector] unhandled event Commit in state Committing
[DEBUG] [07/24/2014 11:28:52.421] [clickstream-akka.actor.default-dispatcher-8] [akka://clickstream/user/prod.log.clickstream.1.connector/stream0] stream=stream0 state=Draining msg=Processed outstanding=390
[DEBUG] [07/24/2014 11:28:52.421] [clickstream-akka.actor.default-dispatcher-8] [akka://clickstream/user/prod.log.clickstream.1.connector/stream0] stream=stream0 state=Draining msg=Processed outstanding=389
[DEBUG] [07/24/2014 11:28:52.421] [clickstream-akka.actor.default-dispatcher-8] [akka://clickstream/user/prod.log.clickstream.1.connector/stream0] stream=stream0 state=Draining msg=Processed outstanding=388
sclasen commented 10 years ago

It looks like you are calling commit() a second time, before the first commit's Future[Unit] has completed? Can you just throw the Commit in an Await.result and see what you see.

I think I understand enough of your usage to work on a test case, so I'll do that.

On Thu, Jul 24, 2014 at 11:43 AM, Jonathan Weeks notifications@github.com wrote:

I assume you mean consumer.commit() not connector.commit()? I don't see a connector.commit() method.

Per your direction, I am now using:

val commitConfig = CommitConfig(commitInterval = None, commitAfterMsgCount = None, commitTimeout = Timeout(60 seconds))

Calling commit() on the consumer prior to sending the FSM.StreamProcessed message works for a while, and then i got the following, after which things failed again:

[akka://clickstream/user/prod.log.clickstream.1.connector] unhandled event Commit in state Committing

See larger context below:

[DEBUG] [07/24/2014 11:28:52.403] [clickstream-akka.actor.default-dispatcher-5] [akka://clickstream/user/prod.log.clickstream.1.connector/stream0] stream=stream0 state=Draining msg=Processed outstanding=392 [DEBUG] [07/24/2014 11:28:52.403] [clickstream-akka.actor.default-dispatcher-5] [akka://clickstream/user/prod.log.clickstream.1.connector/stream0] stream=stream0 state=Draining msg=Processed outstanding=391 14-07-24 11:28:52.420 [clickstream-akka.actor.default-dispatcher-8] INFO com.ait.clickstream.DB - Batch size 200 persisted successfully, in 17 millis 14-07-24 11:28:52.421 [clickstream-akka.actor.default-dispatcher-8] INFO com.ait.clickstream.DB - ackMap has size: 1 14-07-24 11:28:52.421 [clickstream-akka.actor.default-dispatcher-2] INFO com.ait.logging.LogProcessor - In AckMessage, called consumer.commit() and sending 200 acks back to: Actor[akka://clickstream/user/prod.log.clickstream.1.connector/stream0#1612390218] [WARN] [07/24/2014 11:28:52.421] [clickstream-akka.actor.default-dispatcher-5] [akka://clickstream/user/prod.log.clickstream.1.connector] unhandled event Commit in state Committing [DEBUG] [07/24/2014 11:28:52.421] [clickstream-akka.actor.default-dispatcher-8] [akka://clickstream/user/prod.log.clickstream.1.connector/stream0] stream=stream0 state=Draining msg=Processed outstanding=390 [DEBUG] [07/24/2014 11:28:52.421] [clickstream-akka.actor.default-dispatcher-8] [akka://clickstream/user/prod.log.clickstream.1.connector/stream0] stream=stream0 state=Draining msg=Processed outstanding=389 [DEBUG] [07/24/2014 11:28:52.421] [clickstream-akka.actor.default-dispatcher-8] [akka://clickstream/user/prod.log.clickstream.1.connector/stream0] stream=stream0 state=Draining msg=Processed outstanding=388

— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/10#issuecomment-50060253.

jbweeks commented 10 years ago

So it appears that the direct manual call to consumer.commit() doesn't execute in any kind of reasonable time for some reason (I am going to go back to the non-batch "ack" immediately one-by-one mode for a while -- AKA fire and forget, happy path, assume everything works)...

[ERROR] [07/24/2014 12:10:48.898] [clickstream-akka.actor.default-dispatcher-25]       [akka://clickstream/user/KafkaAccessLogProcessor-prod.log.clickstream-1] Futures timed out after [60 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [60 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
at scala.concurrent.Await$.result(package.scala:107)
at com.ait.logging.LogProcessor$$anonfun$receive$1.applyOrElse(LogProcessor.scala:160)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.ait.logging.LogProcessor.aroundReceive(LogProcessor.scala:41)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
sclasen commented 10 years ago

Yep, this would be because the streams believe there are outstanding messages, and they are still waiting to drain. If you do debug logs in that case Im sure you will see the streams reporting outstanding messages.

Not surprised that's what we are running to, cause I hadnt designed in the batch case :). Thinking about how to allow for it.

On Thu, Jul 24, 2014 at 12:18 PM, Jonathan Weeks notifications@github.com wrote:

So it appears that the direct manual call to consumer.commit() doesn't execute in any kind of reasonable time for some reason (I am going to go back to the non-batch "ack" immediately one-by-one mode for a while -- AKA fire and forget, happy path, assume everything works)...

[ERROR] [07/24/2014 12:10:48.898] [clickstream-akka.actor.default-dispatcher-25] [akka://clickstream/user/KafkaAccessLogProcessor-prod.log.clickstream-1] Futures timed out after [60 seconds] java.util.concurrent.TimeoutException: Futures timed out after [60 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) at scala.concurrent.Await$.result(package.scala:107) at com.ait.logging.LogProcessor$$anonfun$receive$1.applyOrElse(LogProcessor.scala:160) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at com.ait.logging.LogProcessor.aroundReceive(LogProcessor.scala:41) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/10#issuecomment-50064748.

jbweeks commented 10 years ago

Hmmm, the weirdest thing. If I specify:

batchTimeout = Timeout(60 seconds)

in my AkkaBatchConsumerProps, on startup, the system acts like it immediately times out and then I have to wait for the timeout to come around again before I begin receiving messages. (I had thought it was broken, then it dawned on me that perhaps I just need to wait for a cycle, and sure enough...)

What is the intent of this parameter? I am assuming it was the maximum time akka-kafka would wait for the client system to process a batch? Sometimes our backend systems are slow, so I wanted a big timeout (assuming that is the purpose of this...)

Also, what happens in case the timeout is breached?

< off to read the source... :) >

Thanks,

jbweeks commented 10 years ago

Looking at the source a bit, I am confused.

(The behavior I am currently seeing is that my message handler is getting called, but a batch is never sent to my "receiver" actor)

In BatchActors, I see that sendBatch() is only ever called in the when(Committing) block, but even though the logs show that akka-kafka has read a full batch, I am not seeing at-drain-finised (should be "finished" -- sp typo) in the logs, therefore my receiver never gets the batch... :(

Interestingly enough, again, in my little test app, things work, but not in the more complex app with more threads, memory allocation, different timing, etc.

(I am testing this with timeouts at 5 seconds BTW)

logs once again, sent out of band

-JW

jbweeks commented 10 years ago

OK, thanks Scott -- the latest 0.0.7-SNAPSHOT is now sending me batches. I will give a more definitive update on whether the full back-pressure is working correctly transactionally in a bit, as I get some time to test the state machine more fully under various workloads, batch sizes, timeout values and threading patterns.

jbweeks commented 10 years ago

Good news -- so far the batch functionality has been feature correct in all my testing.

One thing though -- I am observing a performance/throughput issue with the current implementation, and wanted to bounce around a few ideas.

Right now, I am seeing around seven seconds between sending the "BatchProcessed" message for around 20k messages and getting the next batch delivered.

I don't want to overcomplicate the state machine that has been implemented in AkkaKafka, but I can think of a couple possible ways to "prime the pump".

  1. Fetch double the batch size (documented of course due to memory cost), and pass and commit half at a time
  2. Prefetch the next batch immediately after passing the first batch along

Anyway, thanks again, and a snippet of logs are below showing the "starvation" of the processing between the Ack and the next batch being delivered (likely due to waiting on the network for Kafka brokers to deliver the new messages as well as for ZK to commit the previous batch):

2014-08-07T16:49:27.862-0700 INFO clickstream-akka.actor.default-dispatcher-50 com.w.logging.LogProcessor Got Ack from: Actor[akka://clickstream/user/ClickstreamDBInsertActor0#358066906] with count: 18121; status: 18121 / 18121}
2014-08-07T16:49:27.862-0700 INFO clickstream-akka.actor.default-dispatcher-43 com.w.ait.clickstream.DB In com.w.ait.clickstream.DB@730081b3, Processing batch of size: 0
2014-08-07T16:49:27.862-0700 INFO clickstream-akka.actor.default-dispatcher-50 com.w.logging.LogProcessor In processAck, sending BatchProcessed to: Actor[akka://clickstream/user/prod.log.wpweb-clickstream.connector1407454809699549000#1259446825]
[INFO] [08/07/2014 16:49:33.850] [clickstream-akka.actor.default-dispatcher-54] [akka://clickstream/user/prod.log.wpweb-clickstream.connector1407454809699549000] at=recieve-timeout outstanding=0 batch-size=17932
2014-08-07T16:49:34.870-0700 INFO clickstream-akka.actor.default-dispatcher-54 AccessLogProcessorApp In Clickstream msgHandler in LogProcessorActor context, got kafka batch of size: 17932
sclasen commented 10 years ago

Good to hear its working! There are a few places where we could do more timeout tuning internally so you have minimal pausing.

So one thing you should probably do is use a much smaller batchTimeout, maybe 500ms?

Since you are not getting a full batch here, you are waiting 5 seconds after the consumers dont receive a message to send the batch. The other 2 seconds are part the zk commit, and part the tweaks that can happen internally. . Let me know if you dont see the pauses drop significantly with a 500ms batchTimeout.

sclasen commented 10 years ago

@jbweeks published a new snapshot that should reduce the batch pause by 1 sec in most cases

jbweeks commented 10 years ago

Cool -- the tweaks worked. Just picked up your new snapshot and latencies are now down under a second for the pause/starvation. Sent you logs out of band. What is the variable to tweak to get full batches?

Also, what do you think about the viability/level-of-effort of my earlier ideas around eager fetching/parallelism to keep the pump primed and downstream actors busy (because of the real network latency of ZK commits and Kafka fetches)?

sclasen commented 10 years ago

The less-than-full batches you are seeing is because the connector is not receiving a message from any stream for the interval of a batchTimeout

I am guessing that 'mostly' full batches with a small batchTimeout will provide greater throughput here than always waiting for a full batch.

you may also want to play with the underlying kafka consumer properties, https://github.com/sclasen/akka-kafka#configure, specifically the consumer.timeout.ms which probably doesn't make sense to be more than batchTimeout. That interval is specifically the max time that a StreamFSM will block for while checking to see if there is a message available.

I dont think the eager fetching is tenable given the current setup, since by fetching a message you are updating the offset that would be committed when commitOffsets in the underlying connector is called. (You dont/cant pass offsets to that method).

Also, at these small batch intervals, even a short minor GC could cause the batchTimeout to expire, so maybe add -XX:+PrintGCDetails -XX:+PrintGCDateStamps to your jvm args to see if that is a factor here.

I'll check the logs to see if I can glean any further insight.

sclasen commented 10 years ago

@jbweeks thinking of adding one more bit of functionality before merging the batch branch.

Right now, you need to use @unchecked annotations where you receive the Batch(items) message.

A way around this would be to instead add a MyBatch type param to the Properties and Connector and provide another handlerFunction to the connector of type IndexedSeq[Out] => MyBatch where the type of the items held in MyBatch was invariant.

Thoughts? do you find using the @unchecked annotation annoying enough that this alternative is preferable?

jbweeks commented 10 years ago

@sclasen I think a bit more typing sanity as you suggest would be good.

jbweeks commented 10 years ago

Ok, this seems to be working great. Good stuff. Closing.

sclasen commented 10 years ago

@jbweeks thanks! I'll ping you when I get some time to add the batchHandler function thing.

sclasen commented 10 years ago

@jbweeks hey I just published 0.0.7.1-SNAPSHOT to oss.sonatype, it has the batchHandler stuff, which still defaults to the original behavior, but if you provide a batchHandler that returns an invariant type then you wont get those @unchecked warnings....

like so https://github.com/sclasen/akka-kafka/blob/batch-consumer/src/it/scala/com/sclasen/akka/kafka/AkkaBatchConsumerSpec.scala#L112-L131

Would love a +1 before I merge this. I will publish as 0.0.7 once I do.

jbweeks commented 10 years ago

+1

jbweeks commented 10 years ago

Ok, with the latest fix, +1 — seems to be working well now.

Best Regards,

-Jonathan

On Aug 14, 2014, at 9:42 AM, Scott Clasen notifications@github.com wrote:

@jbweeks hey I just published 0.0.7.1-SNAPSHOT to oss.sonatype, it has the batchHandler stuff, which still defaults to the original behavior, but if you provide a batchHandler that returns an invariant type then you wont get those @unchecked warnings....

like so https://github.com/sclasen/akka-kafka/blob/batch-consumer/src/it/scala/com/sclasen/akka/kafka/AkkaBatchConsumerSpec.scala#L112-L131

Would love a +1 before I merge this. I will publish as 0.0.7 once I do.

— Reply to this email directly or view it on GitHub.

sclasen commented 10 years ago

Great. 0.0.7 on its way to maven-central. Thanks!

On Thu, Aug 14, 2014 at 3:59 PM, Jonathan Weeks notifications@github.com wrote:

Ok, with the latest fix, +1 — seems to be working well now.

Best Regards,

-Jonathan

On Aug 14, 2014, at 9:42 AM, Scott Clasen notifications@github.com wrote:

@jbweeks hey I just published 0.0.7.1-SNAPSHOT to oss.sonatype, it has the batchHandler stuff, which still defaults to the original behavior, but if you provide a batchHandler that returns an invariant type then you wont get those @unchecked warnings....

like so https://github.com/sclasen/akka-kafka/blob/batch-consumer/src/it/scala/com/sclasen/akka/kafka/AkkaBatchConsumerSpec.scala#L112-L131

Would love a +1 before I merge this. I will publish as 0.0.7 once I do.

— Reply to this email directly or view it on GitHub.

— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/10#issuecomment-52256081.

rsarawgi commented 10 years ago

Hi, I'm seeing a similar issue in my Batch Consumer. Not very frequently, but after every 15 mins or so I see messages like these : "Unhandled event BatchProcessed in Receiving state"

I'm using the latest 0.0.7 build from Maven

sclasen commented 10 years ago

Hi @rsarawgi

Does this also hang the Consumer? or do you continue processing?

Any other info you can share on what kind of processing you are doing, the actor you are using to receive the batches, etc would be helpful.

Thanks!

jbweeks commented 10 years ago

FYI — we are seeing "Unhandled event BatchProcessed in Receiving state" occasionally as well…

-JW

On Oct 9, 2014, at 10:00 AM, Scott Clasen notifications@github.com wrote:

Hi @rsarawgi

Does this also hang the Consumer? or do you continue processing?

Any other info you can share on what kind of processing you are doing, the actor you are using to receive the batches, etc would be helpful.

Thanks!

— Reply to this email directly or view it on GitHub.

sclasen commented 10 years ago

Aha. I think I may have an idea as to what is happening. I am going to guess that in these cases you are receiving an second, empty, batch due to your batch processing taking longer than 1 second, and the state timeout in the FSM being hit. (this is definitely a bug).

I think I should probably add another state to the BatchConnectorFSM to make this easier to grok.

Thanks for the report!

sclasen commented 10 years ago

@jbweeks @rsarawgi just published 0.0.8-SNAPSHOT, which I hope will resolve this issue. Thanks for the report. Let me know once you've had a chance to test this one out. If it works as expected, I'll cut 0.0.8

Cheers!

rsarawgi commented 10 years ago

@sclasen : The processing continues. So it's not a major issue. But just wanted to report it to make sure that I'm not doing anything wrong.

jbweeks commented 10 years ago

Will do! Thanks as always for the quick turn-around!

-Jonathan

On Oct 9, 2014, at 10:37 AM, Scott Clasen notifications@github.com wrote:

@jbweeks @rsarawgi just published 0.0.8-SNAPSHOT, which I hope will resolve this issue. Thanks for the report. Let me know once you've had a chance to test this one out. If it works as expected, I'll cut 0.0.8

Cheers!

— Reply to this email directly or view it on GitHub.

rsarawgi commented 10 years ago

On it! Thanks

jfoy commented 10 years ago

@sclasen @jbweeks 0.0.8-SNAPSHOT looks like it's doing what we want, and we're no longer seeing errors -- either in our code from empty batches, or in akka-kafka reporting "unhandled event BatchProcessed in state Receiving". Please cut 0.0.8 at your convenience, and we'll ship it to production. Thanks again for your work on this!

sclasen commented 10 years ago

@jbweeks @rsarawgi @jfoy shipped! should be in maven central now. Thanks for the feedback!

rsarawgi commented 10 years ago

Thanks @sclasen