eligosource / eventsourced

A library for building reliable, scalable and distributed event-sourced applications in Scala
Apache License 2.0
828 stars 98 forks source link

Strange problem #46

Closed ponythewhite closed 11 years ago

ponythewhite commented 11 years ago

The below code gives me very strange behavior.

import scala.concurrent. import scala.concurrent.duration.

import akka.actor._ import akka.pattern.ask import akka.util.Timeout

import org.eligosource.eventsourced.core._ import org.eligosource.eventsourced.journal.LeveldbJournalProps

object QFXSystem { implicit val system = ActorSystem("example") implicit val timeout = Timeout(5 seconds)

import system.dispatcher

val journalDir = new java.io.File("target/example-1") val journal = Journal(LeveldbJournalProps(journalDir))

val extension = EventsourcingExtension(system, journal) val processor = extension.processorOf(Props(new OrderProcessor with Emitter with Eventsourced { val id = 0 })) val riskEngine = extension.processorOf(Props(new OrderConsumer(processor) with Receiver with Confirm with Eventsourced { val id = 1 })) extension.recover()

class OrderProcessor extends Actor { this: Emitter =>

def receive = {
  case x => println (x)
}

}

class OrderConsumer(val consumer: ActorRef) extends Actor { this: Receiver => def receive = { case x => println(x) } } }

Exceptions follow:

Exception in thread "main" java.lang.ExceptionInInitializerError at com.fxq.qfx.QFXSystem.main(QFXSystem.scala) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:96) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:100) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.eligosource.eventsourced.core.EventsourcingExtension.processorOf(EventsourcingExtension.scala:110) at com.fxq.qfx.QFXSystem$.(QFXSystem.scala:24) at com.fxq.qfx.QFXSystem$.(QFXSystem.scala) ... 1 more Uncaught error from thread [example-akka.actor.default-dispatcher-2] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[example] java.lang.NoClassDefFoundError: Could not initialize class com.fxq.qfx.QFXSystem$ at com.fxq.qfx.QFXSystem$$anonfun$2$$anon$2.(QFXSystem.scala:24) at com.fxq.qfx.QFXSystem$$anonfun$2.apply(QFXSystem.scala:24) at com.fxq.qfx.QFXSystem$$anonfun$2.apply(QFXSystem.scala:24) at akka.actor.ActorCell.newActor(ActorCell.scala:461) at akka.actor.ActorCell.create(ActorCell.scala:479) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:351) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:256) at akka.dispatch.Mailbox.run(Mailbox.scala:211) at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:502) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104) [ERROR] [01/06/2013 15:02:19.742] [example-akka.actor.default-dispatcher-2] [ActorSystem(example)] Uncaught error from thread [example-akka.actor.default-dispatcher-2] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled java.lang.NoClassDefFoundError: Could not initialize class com.fxq.qfx.QFXSystem$ at com.fxq.qfx.QFXSystem$$anonfun$2$$anon$2.(QFXSystem.scala:24) at com.fxq.qfx.QFXSystem$$anonfun$2.apply(QFXSystem.scala:24) at com.fxq.qfx.QFXSystem$$anonfun$2.apply(QFXSystem.scala:24) at akka.actor.ActorCell.newActor(ActorCell.scala:461) at akka.actor.ActorCell.create(ActorCell.scala:479) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:351) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:256) at akka.dispatch.Mailbox.run(Mailbox.scala:211) at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:502) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

Any ideas what I'm doing wrong?

krasserm commented 11 years ago

I can reproduce that but need to dig in further. Not sure whether this is related to eventsourced or Akka/Scala. As a workaround use the non-blocking processorOf method (which is anyway preferred) i.e.

extension.processorOf(ProcessorProps(2, pid => new OrderConsumer(processor) with Receiver with Confirm with Eventsourced { val id = pid }))

where ProcessorProps is passed the processor id and a processor factory as arguments. This will make your example work. BTW, processor ids must be positive integers (not enforced yet but will be in the next milestone release).

Another workaround is to put the initialization code inside a method:

object QFXSystem {
    def init() {
        implicit val system = ActorSystem("example")
        // ...
        extension.recover()
    }
}

and call QFXSystem.init() from wherever you need. Alternatively, make QFXSystem a class. Calling new QFXSystem will run your original initialization code properly.

I'll post another message to this thread once I found the cause for this issue. Thanks for reporting.

Cheers, Martin

ponythewhite commented 11 years ago

No problem, thx for your helpful reply. Positive id requirement is unrelated AFAIK, already checked that. To help you with your research I have also noticed, that when I create: val processor1 = extension.processorOf(Props(new OrderProcessor with Emitter with Eventsourced { val id = 1 })) val processor2 = extension.processorOf(Props(new OrderProcessor with Emitter with Eventsourced { val id = 2 })) val processor3 = extension.processorOf(Props(new OrderProcessor with Emitter with Eventsourced { val id = 3 }))

that runs OK.

But when I try to fill them into a Map[Int,ActorRef] using a simple for (id <- List(1,2,3)), it also fails. So explicit manual listing of processor variables works, but simple "parametrized" creation (with incrementing id's) does not.

Will check out the methods you proposed soon, thx again!

krasserm commented 11 years ago

I posted a related question to the akka-user list. Seems that these issues are not related to eventsourced.

https://groups.google.com/forum/?fromgroups=#!topic/akka-user/AzjDkaNshKg

krasserm commented 11 years ago

Confirmed as not related to eventsourced (see previous link). This also explains why the non-blocking processorOf method works.

ponythewhite commented 11 years ago

Thank you for your prompt help :+1:

krasserm commented 11 years ago

You're most welcome!