Closed sangszhou closed 9 years ago
There's not enough information here for me to tell what's going on, other than it looks like you're getting mongodb timeouts. Does your configuration work on a simpler case of no clustering/no sharding?
Have a look at issue #24 and see if that helps.
I wrote a toy example without sharding and cluster, but still I got an error. FYI, my mongo version is 2.6.1.
Uncaught error from thread [example-akka.actor.default-dispatcher-2] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[example]
java.lang.NoSuchMethodError: reactivemongo.api.MongoDriver.close$default$1()Lscala/concurrent/duration/FiniteDuration;
at akka.contrib.persistence.mongodb.RxMongoDriver$$anonfun$1.apply$mcV$sp(RxMongoPersistenceExtension.scala:73)
at akka.contrib.persistence.mongodb.RxMongoDriver$$anonfun$1.apply(RxMongoPersistenceExtension.scala:73)
at akka.contrib.persistence.mongodb.RxMongoDriver$$anonfun$1.apply(RxMongoPersistenceExtension.scala:73)
at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:641)
at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.runNext$1(ActorSystem.scala:808)
at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply$mcV$sp(ActorSystem.scala:811)
at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:804)
at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:804)
at akka.util.ReentrantGuard.withGuard(LockUtil.scala:15)
at akka.actor.ActorSystemImpl$TerminationCallbacks.run(ActorSystem.scala:804)
at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:638)
at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:638)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
main.class
case class Cmd(data: String)
case class Evt(data: String)
case class ExampleState(events: List[String] = Nil) {
def updated(evt: Evt): ExampleState = copy(evt.data :: events)
def size: Int = events.length
override def toString: String = events.reverse.toString
}
class PersistentActorExample extends PersistentActor {
var state = ExampleState()
def updateState(event: Evt): Unit =
state = state.updated(event)
def numEvents =
state.size
override def receiveRecover: Receive = {
case evt: Evt => updateState(evt)
case SnapshotOffer(_, snapShot: ExampleState) => state = snapShot
}
override def receiveCommand: Receive = {
case Cmd(data) =>
persist(Evt(s"${data}-${numEvents}"))(updateState)
persist(Evt(s"${data}-${numEvents + 1}")) { event =>
updateState(event)
context.system.eventStream.publish(event)
}
case "snap" => saveSnapshot(state)
case "print" => println(state)
}
override def persistenceId: String = "sample-id-2"
}
object PersistentActorExample extends App {
val system = ActorSystem("example")
val persistentActor = system.actorOf(Props[PersistentActorExample], "persistentActor-4-scala")
persistentActor ! Cmd("foo")
persistentActor ! Cmd("baz")
persistentActor ! Cmd("bar")
persistentActor ! "snap"
persistentActor ! Cmd("buzz")
persistentActor ! "print"
Thread.sleep(1000)
system.shutdown()
}
build.sbt
name := "PlayWithActor"
version := "1.0"
scalaVersion := "2.11.6"
val akkaVersion = "2.3.9"
resolvers += "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"
resolvers += "Typesafe Maven Repository" at "http://repo.typesafe.com/typesafe/maven-releases/"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-contrib" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
"org.scalatest" %% "scalatest" % "2.2.4" % "test",
"commons-io" % "commons-io" % "2.4" % "test",
"com.sksamuel.elastic4s" %% "elastic4s-core" % "1.6.0",
"io.spray" %% "spray-http" % "1.3.2"
,"org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23"
// "com.github.ironfish" %% "akka-persistence-mongo-casbah" % "0.7.5" % "compile"
,"com.github.scullxbones" %% "akka-persistence-mongo-rxmongo" % "0.4.0"
)
application.conf
akka.contrib.persistence.mongodb.mongo.driver = "akka.contrib.persistence.mongodb.RxMongoPersistenceExtension"
akka.persistence.journal.plugin = "akka-contrib-mongodb-persistence-journal"
akka.persistence.snapshot-store.plugin = "akka-contrib-mongodb-persistence-snapshot"
//akka.contrib.persistence.mongodb.mongo.urls = ["icam-dev-mongo1:27017"]
Yep definitely something broken here. I got it to work with a couple changes:
1) Need to use rxmongo 0.11+ with 0.4.0
"org.reactivemongo" %% "reactivemongo" % "0.11.1",
2) It looks like the legacy way of specifying connections is broken, i'll need to get back in and fix that. The mongouri approach is working.
akka.contrib.persistence.mongodb.mongo.mongouri = "mongodb://localhost:27017/sample_db_name"
3) The delay needs to be longer apparently to get results. I used 3s instead of 1s.
I'll track the fix for number 2 with this ticket, but with the 3 steps above you should be unblocked.
Thanks for the report.
If you ended up not using the mongouri
configuration, you should be able to use your previous configuration with version 0.4.1
. Don't forget to upgrade your reactive mongo driver to a minimum of 0.11
.
I am using akka 2.3.9, the useful log is pasted as below. It seems that coordinator trying to recovery from mongo data which are not available yet.
[2015-07-09 23:23:38,347] ERROR[ClusterSystem-akka.actor.default-dispatcher-15] OneForOneStrategy - Processor killed after recovery failure (persistent id = [/user/sharding/ActionWorkerCoordinator/singleton/coordinator]). To avoid killing processors on recovery failure, a processor must handle RecoveryFailure messages. RecoveryFailure was caused by: akka.pattern.CircuitBreakerOpenException: Circuit Breaker is open; calls are failing fast