uzh / signal-collect

A framework for scalable graph computing.
www.signalcollect.com
Apache License 2.0
148 stars 32 forks source link

Multiple S/C instances, problem on shutdown. #154

Closed pstutz closed 9 years ago

pstutz commented 10 years ago

@cshapeshifter describes an issue when running multiple S/C instances. By default they all run on the same actor system, which means that the first one that shuts down will shut down the actor system for all of them. This is quite certainly not the desired behaviour.

cshapeshifter commented 9 years ago

This is still an open issue for me. Actually, I saw in the docs of GraphBuilder that it should be possible to use multiple instances of S/C by specifying a unique actor name prefix for each computation. But this does not seem to work or I'm doing something wrong. Here's a complete test case illustrating the issue:

import com.signalcollect._

object TestCase extends App {
  def createComputation(p: String): Graph[_,_] = {
    val graph = GraphBuilder.withActorNamePrefix(p).build
    graph.addVertex(new PageRankVertex(1))
    graph.addVertex(new PageRankVertex(2))
    graph.addEdge(1, new PageRankEdge(2))
    graph.addEdge(2, new PageRankEdge(1))
    graph
  }
  val graph1 = createComputation("prefix1")
  val graph2 = createComputation("prefix2")
  val graph3 = createComputation("prefix3")
  graph1.execute
  graph1.foreachVertex(println(_))
  graph2.execute
  graph2.foreachVertex(println(_))
  graph3.execute
  graph3.foreachVertex(println(_))
  graph1.shutdown
  graph2.shutdown
  graph3.shutdown
}

class PageRankVertex(id: Int, baseRank: Double = 0.15)
    extends DataGraphVertex(id, baseRank) {
  type Signal = Double
  def dampingFactor = 1 - baseRank
  def collect = baseRank + dampingFactor * signals.sum
}

class PageRankEdge(targetId: Int)
    extends DefaultEdge(targetId) {
  type Source = PageRankVertex
  def signal = source.state * weight / source.sumOfOutWeights
}

Running this test case, all 3 actor systems are shutdown on graph1.shutdown(), resulting in the following errors:

Recipient[Actor[akka://SignalCollect/user/prefix2DefaultNodeActor#646511669]] had already been terminated.
Recipient[Actor[akka://SignalCollect/user/prefix3DefaultNodeActor#723756331]] had already been terminated.

Full output:

PageRankVertex(id=1, state=0.9999999999999997)
PageRankVertex(id=2, state=0.9999999999999997)
PageRankVertex(id=1, state=0.9999999999999997)
PageRankVertex(id=2, state=0.9999999999999997)
PageRankVertex(id=1, state=0.9999999999999997)
PageRankVertex(id=2, state=0.9999999999999997)
akka.pattern.AskTimeoutException: Recipient[Actor[akka://SignalCollect/user/prefix2DefaultNodeActor#646511669]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
        at akka.pattern.AskableActorRef$.$qmark$extension(AskSupport.scala:144)
        at com.signalcollect.messaging.AkkaProxy.invoke(AkkaProxy.scala:110)
        at com.sun.proxy.$Proxy4.shutdown(Unknown Source)
        at com.signalcollect.DefaultGraph$$anonfun$shutdown$1.apply(DefaultGraph.scala:764)
        at com.signalcollect.DefaultGraph$$anonfun$shutdown$1.apply(DefaultGraph.scala:764)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
        at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception in proxy method `shutdown(`: akka.pattern.AskTimeoutException: Recipient[Actor[akka://SignalCollect/user/prefix2DefaultNodeActor#646511669]] had already been terminated. from Actor[akka://SignalCollect/user/prefix2DefaultNodeActor#646511669] ()
akka.pattern.AskTimeoutException: Recipient[Actor[akka://SignalCollect/user/prefix3DefaultNodeActor#723756331]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
        at akka.pattern.AskableActorRef$.$qmark$extension(AskSupport.scala:144)
        at com.signalcollect.messaging.AkkaProxy.invoke(AkkaProxy.scala:110)
        at com.sun.proxy.$Proxy4.shutdown(Unknown Source)
        at com.signalcollect.DefaultGraph$$anonfun$shutdown$1.apply(DefaultGraph.scala:764)
        at com.signalcollect.DefaultGraph$$anonfun$shutdown$1.apply(DefaultGraph.scala:764)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
        at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception in proxy method `shutdown(`: akka.pattern.AskTimeoutException: Recipient[Actor[akka://SignalCollect/user/prefix3DefaultNodeActor#723756331]] had already been terminated. from Actor[akka://SignalCollect/user/prefix3DefaultNodeActor#723756331] ()
pstutz commented 9 years ago

Fixed, I presume you're okay with the Apache license on the test, please ping me if not.