sclasen / akka-kafka

185 stars 62 forks source link

Unable to cleanly stop AkkaConsumer #41

Closed ulejon closed 8 years ago

ulejon commented 8 years ago

I have an actor in which I create an AkkaConsumer.

class ConsumerActor(topic: String) with Actor with ActorLogging {
    val akkaConsumer: AkkaConsumer[String, String] = createAkkaConsumer()

    private def createAkkaConsumer() = {
        val consumerProps = AkkaConsumerProps.forContext(
            context = context,
            zkConnect = "localhost:2181",
            topic = topic,
            group = s"Group-$topic",
            streams = 1,
            keyDecoder = new StringDecoder(),
            msgDecoder = new StringDecoder(),
            receiver = self,
            commitConfig = new CommitConfig()
        )

        new AkkaConsumer(consumerProps)
    }

    override def receive: Receive = {
        case Start => akkaConsumer.start() 
        case x: Any =>
            log.info(s"Kafka message: $x")
            sender() ! StreamFSM.Processed
    }

    @throws[Exception](classOf[Exception])
    override def postStop(): Unit = {
        akkaConsumer.stop().onComplete {
            case Success(_) => log.info("Finished stopping akka consumer")
            case Failure(exception) => log.error(exception, "Error stopping akka consumer")
        }
    }
}

When I'm stopping the actor system (to which the ConsumerActor is a child) I always get the error "Error stopping akka consumer" in my logs. The stacktrace looks like this:

akka.pattern.AskTimeoutException: Recipient[Actor[akka://mySystem/user/consumer-starter/consumer-482628338]] had already been terminated. at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132) ~[akka-actor_2.11-2.3.11.jar:na] at akka.pattern.AskableActorRef$.$qmark$extension(AskSupport.scala:144) ~[akka-actor_2.11-2.3.11.jar:na] at com.sclasen.akka.kafka.AkkaConsumer.stop(AkkaConsumer.scala:61) ~[akka-kafka_2.11-0.1.0.jar:0.1.0]

I guess this is due to the fact that I'm closing the akkaConsumer in the postStop function, which means the actor has already been closed. Is there any other way of doing this?

sclasen commented 8 years ago

if you are shutting down the actor system, I think you dont necessarily need to call stop on the akka consumer.