Closed nabeelxy closed 7 years ago
It loads the graph. I get the following exception:
17/02/06 20:08:10 INFO TaskSchedulerImpl: Adding task set 12.0 with 1 tasks 17/02/06 20:08:10 INFO TaskSetManager: Starting task 0.0 in stage 12.0 (TID 8, localhost, PROCESS_LOCAL, 2146 bytes) 17/02/06 20:08:10 INFO Executor: Running task 0.0 in stage 12.0 (TID 8) 17/02/06 20:08:10 INFO BlockManager: Found block rdd_18_0 locally 17/02/06 20:08:10 INFO Executor: Finished task 0.0 in stage 12.0 (TID 8). 2253 bytes result sent to driver 17/02/06 20:08:10 INFO TaskSetManager: Finished task 0.0 in stage 12.0 (TID 8) in 20 ms on localhost (1/1) 17/02/06 20:08:10 INFO DAGScheduler: ShuffleMapStage 12 (mapPartitions at VertexRDDImpl.scala:247) finished in 0.021 s 17/02/06 20:08:10 INFO TaskSchedulerImpl: Removed TaskSet 12.0, whose tasks have all completed, from pool 17/02/06 20:08:10 INFO DAGScheduler: looking for newly runnable stages 17/02/06 20:08:10 INFO DAGScheduler: running: Set() 17/02/06 20:08:10 INFO DAGScheduler: waiting: Set(ResultStage 13) 17/02/06 20:08:10 INFO DAGScheduler: failed: Set() 17/02/06 20:08:10 INFO DAGScheduler: Missing parents for ResultStage 13: List() 17/02/06 20:08:10 INFO DAGScheduler: Submitting ResultStage 13 (MapPartitionsRDD[32] at map at EdgeRDDImpl.scala:89), which is now runnable 17/02/06 20:08:10 INFO MemoryStore: ensureFreeSpace(5896) called with curMem=522206, maxMem=555755765 17/02/06 20:08:10 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 5.8 KB, free 529.5 MB) 17/02/06 20:08:10 INFO MemoryStore: ensureFreeSpace(3044) called with curMem=528102, maxMem=555755765 17/02/06 20:08:10 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 3.0 KB, free 529.5 MB) 17/02/06 20:08:10 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on localhost:36967 (size: 3.0 KB, free: 529.9 MB) 17/02/06 20:08:10 INFO SparkContext: Created broadcast 11 from broadcast at DAGScheduler.scala:861 17/02/06 20:08:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 13 (MapPartitionsRDD[32] at map at EdgeRDDImpl.scala:89) 17/02/06 20:08:10 INFO TaskSchedulerImpl: Adding task set 13.0 with 1 tasks 17/02/06 20:08:10 INFO TaskSetManager: Starting task 0.0 in stage 13.0 (TID 9, localhost, PROCESS_LOCAL, 2556 bytes) 17/02/06 20:08:10 INFO Executor: Running task 0.0 in stage 13.0 (TID 9) 17/02/06 20:08:10 INFO CacheManager: Partition rdd_30_0 not found, computing it 17/02/06 20:08:10 INFO BlockManager: Found block rdd_21_0 locally 17/02/06 20:08:10 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 17/02/06 20:08:10 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms 17/02/06 20:08:10 ERROR Executor: Exception in task 0.0 in stage 13.0 (TID 9) java.lang.NullPointerException at sparkle.graph.BeliefPropagation$$anonfun$2.apply(BeliefPropagation.scala:48) at sparkle.graph.BeliefPropagation$$anonfun$2.apply(BeliefPropagation.scala:47) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.graphx.impl.EdgePartition.map(EdgePartition.scala:185) at org.apache.spark.graphx.impl.GraphImpl$$anonfun$7.apply(GraphImpl.scala:156) at org.apache.spark.graphx.impl.GraphImpl$$anonfun$7.apply(GraphImpl.scala:155) at org.apache.spark.graphx.impl.EdgeRDDImpl$$anonfun$mapEdgePartitions$1.apply(EdgeRDDImpl.scala:121) at org.apache.spark.graphx.impl.EdgeRDDImpl$$anonfun$mapEdgePartitions$1.apply(EdgeRDDImpl.scala:118) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
I am using a very simple factor graph: 1 factor and 2 variables:
1
2 1 2 2 2 4 0 1 1 0.9 2 0.9 3 1
Hi @nabeelxy! We are using a slightly modified format with explicit factor IDs. Please refer to the example: https://github.com/HewlettPackard/sandpiper/blob/master/data/factor/graph7.fg.
@manish-marwah could you suggest on the second question?
Hi @nabeelxy, factor values are part of the input to BP. They are usually determined based on domain knowledge, or if enough data is available they can be learned from data.
Thank you @avulanov and @manish-marwah for your quick replies.
@manish-marwah, I noticed that you guys have two unused data files vetex100.txt and edge100.txt. They have fixed edge potentials and prior probabilities. Are they used in any why in your code? Also, since they are fixed, can we learn the factor functions from them? One more thing, am I correct that in your factor graph input files factor values are randomly assigned (i.e. not based on any domain knowledge of the variables)?
Thank you guys again for your replies and work.
Hi @avulanov, I think I used the same format as you pointed in graph7.fg. It may be a typo when I copied the input file. Just to be clear, my input factor graph is as follows:
1
2 1 2 2 2 4 0 1 1 0.9 2 0.9 3 1
Format of the file is as expected. In the logs, I see that vertices and edges are identified correctly: Edge(1,3,()) Edge(2,3,())
(1,null) (3,sparkle.graph.NamedFactor@5e1fc42f) (2,null)
However, I still get the null pointer exception. Could it be a bug in libDAI?
.... 17/02/07 08:31:22 INFO TaskSchedulerImpl: Adding task set 12.0 with 1 tasks 17/02/07 08:31:22 INFO TaskSetManager: Starting task 0.0 in stage 12.0 (TID 8, localhost, PROCESS_LOCAL, 2146 bytes) 17/02/07 08:31:22 INFO Executor: Running task 0.0 in stage 12.0 (TID 8) 17/02/07 08:31:22 INFO BlockManager: Found block rdd_18_0 locally 17/02/07 08:31:22 INFO Executor: Finished task 0.0 in stage 12.0 (TID 8). 2253 bytes result sent to driver 17/02/07 08:31:22 INFO TaskSetManager: Finished task 0.0 in stage 12.0 (TID 8) in 24 ms on localhost (1/1) 17/02/07 08:31:22 INFO TaskSchedulerImpl: Removed TaskSet 12.0, whose tasks have all completed, from pool 17/02/07 08:31:22 INFO DAGScheduler: ShuffleMapStage 12 (mapPartitions at VertexRDDImpl.scala:247) finished in 0.024 s 17/02/07 08:31:22 INFO DAGScheduler: looking for newly runnable stages 17/02/07 08:31:22 INFO DAGScheduler: running: Set() 17/02/07 08:31:22 INFO DAGScheduler: waiting: Set(ResultStage 13) 17/02/07 08:31:22 INFO DAGScheduler: failed: Set() 17/02/07 08:31:22 INFO DAGScheduler: Missing parents for ResultStage 13: List() 17/02/07 08:31:22 INFO DAGScheduler: Submitting ResultStage 13 (MapPartitionsRDD[32] at map at EdgeRDDImpl.scala:89), which is now runnable 17/02/07 08:31:22 INFO MemoryStore: ensureFreeSpace(5896) called with curMem=522262, maxMem=555755765 17/02/07 08:31:22 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 5.8 KB, free 529.5 MB) 17/02/07 08:31:23 INFO MemoryStore: ensureFreeSpace(3055) called with curMem=528158, maxMem=555755765 17/02/07 08:31:23 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 3.0 KB, free 529.5 MB) 17/02/07 08:31:23 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on localhost:42590 (size: 3.0 KB, free: 529.9 MB) 17/02/07 08:31:23 INFO SparkContext: Created broadcast 11 from broadcast at DAGScheduler.scala:861 17/02/07 08:31:23 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 13 (MapPartitionsRDD[32] at map at EdgeRDDImpl.scala:89) 17/02/07 08:31:23 INFO TaskSchedulerImpl: Adding task set 13.0 with 1 tasks 17/02/07 08:31:23 INFO TaskSetManager: Starting task 0.0 in stage 13.0 (TID 9, localhost, PROCESS_LOCAL, 2556 bytes) 17/02/07 08:31:23 INFO Executor: Running task 0.0 in stage 13.0 (TID 9) 17/02/07 08:31:23 INFO CacheManager: Partition rdd_30_0 not found, computing it 17/02/07 08:31:23 INFO BlockManager: Found block rdd_21_0 locally 17/02/07 08:31:23 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 17/02/07 08:31:23 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 17/02/07 08:31:23 ERROR Executor: Exception in task 0.0 in stage 13.0 (TID 9) java.lang.NullPointerException at sparkle.graph.BeliefPropagation$$anonfun$2.apply(BeliefPropagation.scala:48) at sparkle.graph.BeliefPropagation$$anonfun$2.apply(BeliefPropagation.scala:47) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.graphx.impl.EdgePartition.map(EdgePartition.scala:185) at org.apache.spark.graphx.impl.GraphImpl$$anonfun$7.apply(GraphImpl.scala:156) at org.apache.spark.graphx.impl.GraphImpl$$anonfun$7.apply(GraphImpl.scala:155) at org.apache.spark.graphx.impl.EdgeRDDImpl$$anonfun$mapEdgePartitions$1.apply(EdgeRDDImpl.scala:121) at org.apache.spark.graphx.impl.EdgeRDDImpl$$anonfun$mapEdgePartitions$1.apply(EdgeRDDImpl.scala:118) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) .... Any pointers in this regard is highly appreciated.
Note that when I paste the input factor graph here, github converts "hash-hash-hash 3" to a "bold digit 3".
@nabeelxy could you try running the unit test? it uses exactly the same file https://github.com/HewlettPackard/sandpiper/blob/master/src/test/scala/sparkle/graph/BeliefPropagationSuite.scala
@manish-marwah, I noticed that you guys have two unused data files vetex100.txt and edge100.txt. They have fixed edge potentials and prior probabilities. Are they used in any why in your code? Also, since they are fixed, can we learn the factor functions from them? One more thing, am I correct that in your factor graph input files factor values are randomly assigned (i.e. not based on any domain knowledge of the variables)?
@nabeelxy those files are based on a real graph, that is, the degree distribution is based on a real graph. All nodes are represent the same variable, and three different priors are used. (A prior of 0,5 implies no previous knowledge about that node). The edges in the model are homogeneous (since there is only one kind of node), and hence share the same edge potentials. The potential shows a common case of homophily - similar state pairs have high values, and dissimilar ones have low values.
@nabeelxy We've added a version of the BP algorithm that uses file format you mentioned: vertex100.txt
and edge100.txt
. It is called PairwiseBP and there is an example in the README
.
Thanks for your effort on this.
I am wondering if there are any limitations on the input FG in terms of size or shape? I modified your input FG to have few factors, it fails with null pointers at the point of loading the graph.
Also, would be able to tell me how you decide the factor values in the input FG for BP?