sclasen / akka-kafka

185 stars 62 forks source link

Example in the readme not working #29

Closed kiran-kumar closed 8 years ago

kiran-kumar commented 9 years ago

I have downloaded the code and tried the example code mentioned in the readme file, I have no luck in running the sample program. Every time i start the program, it starts and doesn't consume any data from kafka. Here are the logs i get when debug is enabled [INFO] [01/22/2015 17:59:36.473] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] at=start [INFO] [01/22/2015 17:59:37.743] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] at=created-streams [INFO] [01/22/2015 17:59:37.755] [test-akka.actor.default-dispatcher-4] [ActorSystem(test)] at=consumer-started [DEBUG] [01/22/2015 17:59:37.758] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] state=Receiving msg=Commit uncommitted=0 [INFO] [01/22/2015 17:59:37.758] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] at=transition from=Receiving to=Committing uncommitted=0 [ERROR] [01/22/2015 17:59:37.761] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] Unhandled event and stopping the consumer [DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 state=Processing msg=Continue outstanding=0 [DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 at=transition from=Processing to=Unused [DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 state=Unused msg=Drain outstanding=0 [DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 at=Drained [DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 at=transition from=Unused to=Empty [INFO] [01/22/2015 17:59:37.817] [test-akka.actor.default-dispatcher-4] [ActorSystem(test)] at=consumer-stopped [INFO] [01/22/2015 17:59:37.824] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a/stream0] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://test/user/$a/stream0#677859073] to Actor[akka://test/user/$a/stream0#677859073] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [01/22/2015 17:59:37.826] [test-akka.actor.default-dispatcher-4] [akka://test/user/$a] Message [com.sclasen.akka.kafka.ConnectorFSM$Drained] from Actor[akka://test/user/$a/stream0#677859073] to Actor[akka://test/user/$a#-1603528940] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Later I realized issue is because of calling
consumer.stop() //returns a Future[Unit] that completes when the connector is stopped. It sends an event FSMConnector and ends up in unhandled section of FSMConnector in Actors.scala whenUnhandled{ case Event(ConnectorFSM.Stop, ) => log.error("Unhandled event and stopping the consumer") connector.shutdown() sender() ! ConnectorFSM.Stop context.children.foreach( ! StreamFSM.Stop) stop() }

I commented consumer.stop() and it started reading data from kafka, I have to figure out how to stop the program once it finished all data from kafka. I have tested it from eclipse and command line, both had same problem. Am i doing something wrong?

Appreciate your help.

sclasen commented 9 years ago

Sorry if that is unclear , it was just showing a full example of using the client, you don't want to call stop till you are done consuming, if ever.

Feel free to send a PR with any clarification you think works

Thanks !

On Thursday, January 22, 2015, kiran-kumar notifications@github.com wrote:

I have downloaded the code and tried the example code mentioned in the readme file, I have no luck in running the sample program. Every time i start the program, it starts and doesn't consume any data from kafka. Here are the logs i get when debug is enabled [INFO] [01/22/2015 17:59:36.473] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] at=start [INFO] [01/22/2015 17:59:37.743] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] at=created-streams [INFO] [01/22/2015 17:59:37.755] [test-akka.actor.default-dispatcher-4] [ActorSystem(test)] at=consumer-started [DEBUG] [01/22/2015 17:59:37.758] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] state=Receiving msg=Commit uncommitted=0 [INFO] [01/22/2015 17:59:37.758] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] at=transition from=Receiving to=Committing uncommitted=0 [ERROR] [01/22/2015 17:59:37.761] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] Unhandled event and stopping the consumer [DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 state=Processing msg=Continue outstanding=0 [DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 at=transition from=Processing to=Unused [DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 state=Unused msg=Drain outstanding=0 [DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 at=Drained [DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 at=transition from=Unused to=Empty [INFO] [01/22/2015 17:59:37.817] [test-akka.actor.default-dispatcher-4] [ActorSystem(test)] at=consumer-stopped [INFO] [01/22/2015 17:59:37.824] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a/stream0] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://test/user/$a/stream0#677859073] to Actor[akka://test/user/$a/stream0#677859073] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [01/22/2015 17:59:37.826] [test-akka.actor.default-dispatcher-4] [akka://test/user/$a] Message [com.sclasen.akka.kafka.ConnectorFSM$Drained] from Actor[akka://test/user/$a/stream0#677859073] to Actor[akka://test/user/$a#-1603528940] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Later I realized issue is because of calling

consumer.stop() //returns a Future[Unit] that completes when the connector is stopped. It sends an event FSMConnector and ends up in unhandled section of FSMConnector in Actors.scala whenUnhandled{ case Event(ConnectorFSM.Stop,

) => log.error("Unhandled event and stopping the consumer") connector.shutdown() sender() ! ConnectorFSM.Stop context.children.foreach( ! StreamFSM.Stop) stop() }

I commented consumer.stop() and it started reading data from kafka, I have to figure out how to stop the program once it finished all data from kafka. I have tested it from eclipse and command line, both had same problem. Am i doing something wrong?

Appreciate your help.

— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/29.

kiran-kumar commented 9 years ago

I have sent a pull request, commenting the line which is causing trouble and made a comment. Please take a look and let me know if it works