sclasen / akka-kafka

185 stars 62 forks source link

Unused state does not try again #23

Open kuhnen opened 9 years ago

kuhnen commented 9 years ago

For some reason the first time the StreamFSM tries to get messages from kafka it does not find any message and goes to state Unused forever (If I use CommitConfig(None, none). My idea is to have a scheduler that should send a message (Continue) after some time, for exemple:

When(Unused) system.scheduler.schedule(10 seconds, 1 second, self, Continue) //Just once

If you agree with this , I can make a pull request

By the way, Any idea why if I set a high maxMessagesInFlight (5000 for example), the KafkaConsumer stops receiving messages and then it goes to Unsued state ?. It seems some problem with the KafkaConsumer when I try to read a lot of messages really fast

sclasen commented 9 years ago

Hi Andre

Thanks for the report.

It would be great if you could write a PR with a test case that reproduces this behavior. (The tests are run with it:test in sbt, see the bottom of the readme for how to get kafka/zk running locally)

Also what config are you using, if any, that is non default?

I think I have an intuition for why this is happening and the solution may be similar to what you describe, but instead of the scheduler, we would use the akka FSM StateTimeout mechanism inside the unused state, and when we timeout, we send a continue. (It may or may not be more complex than that)

If we use the scheduler, the Continue could happen in any state, which we don't want.

The issue you see is only a problem when you are not committing messages, because you get to try again after a commit when you are.

Regarding your second question, it could be that there are GC or other pauses that cause the underlying kafka consumer to timeout, a test case that reproduces this would let us reason better about it.

Thanks!

Sent from my iPhone

On Dec 11, 2014, at 4:33 AM, Andre Kuhnen notifications@github.com wrote:

For some reasons the first time the StreamFSM tries to get messages from kafka it does not find any message and goes to state Unused forever (If I use CommitConfig(None, none). My idea is to have a scheduler that should send a message (Continue) after some time, for exemple:

When(Unused) system.scheduler.schedule(10 seconds, 1 second, self, Continue) //Just once

If you agree with this , I can make a pull request

By the way, Any idea why if I set a high maxMessagesInFlight (5000 for example), the KafkaConsumer stops receiving messages and then it goes to Unsued state ?. It seems some problem with the KafkaConsumer when I try to read a lot of messages really fast

— Reply to this email directly or view it on GitHub.

kuhnen commented 9 years ago

Hi Sclasen,

I will try to reproduce the behavior about the unused state and use the FSMstate timeout, then I will make a pull request.

About the second question I was overriding some actors names and for some strange reason akka would not throw an exception. This problem is fixed now.

Thanks

sclasen commented 9 years ago

25 in process to resolve this.