aditya-grover / node2vec

http://snap.stanford.edu/node2vec/
MIT License
2.65k stars 914 forks source link

Setting Directed Flag Causes NullPointerException #29

Open qantik opened 6 years ago

qantik commented 6 years ago

Hi,

Somehow the Spark implementation crashes whenever the directed flag is set to true. I ran it with both the karate.edgelist example and some dummy two-edge graph. The exception is always raised at the same location inside initTransitionProb after the graph has been loaded.

java.lang.NullPointerException
at Node2vec$$anonfun$initTransitionProb$2.apply(Node2vec.scala:69)
at Node2vec$$anonfun$initTransitionProb$2.apply(Node2vec.scala:68)
at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$5.apply(GraphImpl.scala:129)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$5.apply(GraphImpl.scala:129)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Cheers.

datazhen commented 6 years ago

Reason

I had met same error when running val (j, q) = GraphOps.setupAlias(nodeAttr.neighbors) in GraphOps.initTransitionProb, This is due to the nodeAttr object being Null.

Detail

  1. Some dst nodes should be included in the node2attr object but not in the following code:
    val node2attr = triplets.map { case (src, dst, weight) =>
      (src, Array((dst, weight))) 
    }.reduceByKey(_++_).map { case (srcId, neighbors: Array[(Long, Double)]) =>
      var neighbors_ : Array[(Long, Double)] = neighbors.groupBy(_._1).map { case (group, traversable) =>  
        traversable.head
      }.toArray
      if (neighbors_.length > bcMaxDegree.value) {
        neighbors_ = neighbors.sortWith{ case (left, right) => left._2 > right._2 }.slice(0, bcMaxDegree.value)
      }
  2. Then when creating graph object by val graph = Graph(indexedNodes, indexedEdges), the dst nodes mentioned above, which are missing, will be created by default. And the format by default is [vertexId, Null] ,instead of [vertexId, NodeAttr]. So the error come.

Solutions

To solve the problem, some modules should be modified. The details is shown below:

  1. GraphOps.initTransitionProb
    val graph = Graph(indexedNodes, indexedEdges).mapVertices[NodeAttr] { case (vertexId, nodeAttr) =>
      var path:Array[Long] = null
        if (nodeAttr != null) {  // add 
          val (j, q) = GraphOps.setupAlias(nodeAttr.neighbors)
          val nextNodeIndex = GraphOps.drawAlias(j, q)
          nodeAttr.path = Array(vertexId, nodeAttr.neighbors(nextNodeIndex)._1)
          nodeAttr
        }else{
          NodeAttr() // create a new object
        }
    }
  2. Node2Vec.randomWalk
    
    // add:.filter(x=>x._2.path.nonEmpty).
    val examples = g.vertices.filter(x=>x._2.path.nonEmpty).cache
    ...

// add the condition: attr.dstNeighbors != null && attr.dstNeighbors.nonEmpty iter.map { case (edge, (attr, pathBuffer)) => try { if (pathBuffer != null && pathBuffer.nonEmpty && attr.dstNeighbors != null && attr.dstNeighbors.nonEmpty) { val nextNodeIndex = GraphOps.drawAlias(attr.J, attr.q) val nextNodeId = attr.dstNeighbors(nextNodeIndex) s"$pathBuffer\t$nextNodeId" } else { pathBuffer //add } } catch { case e: Exception => throw new RuntimeException(e.getMessage) }



Hope this can help you! Good luck!