sclasen / akka-kafka

185 stars 62 forks source link

NPE in BatchConnectorFSM #56

Open christophercampbell opened 7 years ago

christophercampbell commented 7 years ago

We get an occasional NullPointerException in the BatchConnectorFSM, and then all consuming halts. There is no obvious way to setup a supervisor to restart?


[ERROR] [2017-01-11 04:51:32,383] [OneForOneStrategy] [loader-akka.actor.default-dispatcher-90] [] null
java.lang.NullPointerException: null
        at org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:138)
        at org.I0Itec.zkclient.ZkClient$13.call(ZkClient.java:1151)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:990)
        at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:1147)
        at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:1142)
        at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:1110)
        at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:326)
        at kafka.consumer.ZookeeperConsumerConnector.commitOffsetToZooKeeper(ZookeeperConsumerConnector.scala:283)
        at kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:304)
        at kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:303)
        at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
        at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:303)
        at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:380)
        at com.sclasen.akka.kafka.BatchConnectorFSM$$anonfun$4.applyOrElse(BatchActors.scala:142)
        at com.sclasen.akka.kafka.BatchConnectorFSM$$anonfun$4.applyOrElse(BatchActors.scala:137)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at akka.actor.FSM$class.processEvent(FSM.scala:663)
        at com.sclasen.akka.kafka.BatchConnectorFSM.processEvent(BatchActors.scala:62)
        at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657)
        at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:651)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
        at com.sclasen.akka.kafka.BatchConnectorFSM.aroundReceive(BatchActors.scala:62)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)```