eubnara / study

6 stars 2 forks source link

spark pregel 예제 코드 #221

Open eubnara opened 4 years ago

eubnara commented 4 years ago

https://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api https://www.qubole.com/blog/processing-hierarchical-data-using-spark-graphx-pregel-api/

https://www.slideshare.net/riyadparvez/pregel-35504069

아래 기능들을 spark 에서는 어떻게 구현할까?

eubnara commented 4 years ago

https://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api


import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators

// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
    if (id == sourceId) 0.0 else Double.PositiveInfinity)

val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => {  // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
eubnara commented 4 years ago

https://www.qubole.com/blog/processing-hierarchical-data-using-spark-graphx-pregel-api/

import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.{Edge, EdgeRDD, EdgeDirection, EdgeTriplet}
import scala.util.hashing.MurmurHash3
// The code below demonstrates use of Graphx Pregel API - Scala 2.11+ 

// functions to build the top down hierarchy

//setup & call the pregel api
def calcTopLevelHierarcy(vertexDF: DataFrame, edgeDF: DataFrame): RDD[(Any,(Int,Any,String,Int,Int))] = {

    // create the vertex RDD
    // primary key, root, path
    val verticesRDD = vertexDF
        .rdd
        .map{x => (x.get(0), x.get(1), x.get(2))}
        .map{x => (
            MurmurHash3.stringHash(x._1.toString).toLong, (
                x._1.asInstanceOf[Any],
                x._2.asInstanceOf[Any],
                x._3.asInstanceOf[String]
            )
        )}

    // create the edge RDD
    // top down relationship
    val EdgesRDD = edgeDF.rdd.map{x => (x.get(0), x.get(1))}
        .map{x => Edge(
            MurmurHash3.stringHash(x._1.toString).toLong,
            MurmurHash3.stringHash(x._2.toString).toLong,
            "topdown"
        )}

    // create graph
    val graph = Graph(verticesRDD, EdgesRDD).cache()

    val pathSeperator = """/"""

    // initialize id, level, root, path, iscyclic, isleaf
    val initialMsg = (0L, 0, 0.asInstanceOf[Any], List("dummy"), 0, 1)

    // add more dummy attributes to the vertices - id, level, root, path, isCyclic, existing value of current vertex to build path, isleaf, pk
    val initialGraph = graph.mapVertices((id, v) => (id, 0, v._2, List(v._3), 0, v._3, 1, v._1) )

    val hrchyRDD = initialGraph.pregel(
        initialMsg,
        Int.MaxValue,
        EdgeDirection.Out
    )(
        setMsg,
        sendMsg,
        mergeMsg
    )

    // build the path from the list
    val hrchyOutRDD = hrchyRDD.vertices.map{
        case(id, v) => (
            v._8,
            (
                v._2,
                v._3,pathSeperator + v._4.reverse.mkString(pathSeperator),
                v._5,
                v._7
            )
        )}

    hrchyOutRDD

}

//mutate the value of the vertices
def setMsg(
    vertexId: VertexId,
    value: (
        Long,
        Int,
        Any,
        List[String],
        Int,
        String,
        Int,
        Any
    ),
    message: (
        Long, // id
        Int, // level
        Any, // root
        List[String], // path
        Int, // isCyclic
        Int // isleaf
    )
): (
    Long, // id
    Int, // level
    Any, // root
    List[String], // path
    Int, // isCyclic
    String, // path
    Int, // isleaf
    Any // pk
) = {
    if (message._2 < 1) { // superstep 0 - initialize
        (
            value._1,
            value._2 + 1,
            value._3,
            value._4,
            value._5,
            value._6,
            value._7,
            value._8
        )
    } else if ( message._5 == 1) { // set isCyclic   
        (
            value._1,
            value._2,
            value._3,
            value._4,
            message._5,
            value._6,
            value._7,
            value._8
        )
    } else if ( message._6 == 0 ) { // set isleaf
        (
            value._1,
            value._2,
            value._3,
            value._4,
            value._5,
            value._6,
            message._6,
            value._8
        )
    } else { // set new values
        (
            message._1,
            value._2 + 1,
            message._3,
            value._6 :: message._4,
            value._5,
            value._6,
            value._7,
            value._8
        )
    }
}

// send the value to vertices
def sendMsg(
    triplet: EdgeTriplet[(
        Long,
        Int,
        Any,
        List[String],
        Int,
        String,
        Int,
        Any
    ), _]): Iterator[(
        VertexId,
        (
            Long,
            Int,
            Any,
            List[String],
            Int,
            Int
        )
    )] = {
    val sourceVertex = triplet.srcAttr
    val destinationVertex = triplet.dstAttr
    // check for icyclic
    if (sourceVertex._1 == triplet.dstId || sourceVertex._1 == destinationVertex._1) {
        if (destinationVertex._5 == 0) { // set iscyclic
            Iterator((
                triplet.dstId,(
                    sourceVertex._1,
                    sourceVertex._2,
                    sourceVertex._3,
                    sourceVertex._4,
                    1,
                    sourceVertex._7
                )
            ))
        } else {
            Iterator.empty
        } 
    } else {
        if (sourceVertex._7 == 1) { // is NOT leaf 
            Iterator((
                triplet.srcId, (
                    sourceVertex._1,
                    sourceVertex._2,
                    sourceVertex._3,
                    sourceVertex._4,
                    0,
                    0
                )
            ))
        } else { // set new values
            Iterator((
                triplet.dstId, (
                    sourceVertex._1,
                    sourceVertex._2,
                    sourceVertex._3,
                    sourceVertex._4,
                    0,
                    1
                )
            ))
        }
    }
}

// receive the values from all connected vertices
def mergeMsg(msg1: (Long,Int,Any,List[String],Int,Int), msg2: (Long,Int, Any,List[String],Int,Int)): (Long,Int,Any,List[String],Int,Int) = {
    // dummy logic not applicable to the data in this usecase
    msg2
}

// Test with some sample data 

val empData = Array(
  ("EMP001", "Bob", "Baker", "CEO", null.asInstanceOf[String])
  , ("EMP002", "Jim", "Lake", "CIO", "EMP001")
  , ("EMP003", "Tim", "Gorab", "MGR", "EMP002")
  , ("EMP004", "Rick", "Summer", "MGR", "EMP002")
  , ("EMP005", "Sam", "Cap", "Lead", "EMP004")
  , ("EMP006", "Ron", "Hubb", "Sr.Dev", "EMP005")
  , ("EMP007", "Cathy", "Watson", "Dev", "EMP006")
  , ("EMP008", "Samantha", "Lion", "Dev", "EMP007")
  , ("EMP009", "Jimmy", "Copper", "Dev", "EMP007")
  , ("EMP010", "Shon", "Taylor", "Intern", "EMP009")
)

// create dataframe with some partitions
val empDF = sc.parallelize(empData, 3).toDF("emp_id","first_name","last_name","title","mgr_id").cache()

// primary key , root, path - dataframe to graphx for vertices
val empVertexDF = empDF.selectExpr("emp_id","concat(first_name,' ',last_name)","concat(last_name,' ',first_name)")

// parent to child - dataframe to graphx for edges
val empEdgeDF = empDF.selectExpr("mgr_id","emp_id").filter("mgr_id is not null")

// call the function
val empHirearchyExtDF = calcTopLevelHierarcy(empVertexDF,empEdgeDF)
  .map{ case(pk,(level,root,path,iscyclic,isleaf)) => (pk.asInstanceOf[String],level,root.asInstanceOf[String],path,iscyclic,isleaf)}
  .toDF("emp_id_pk","level","root","path","iscyclic","isleaf").cache()

// extend original table with new columns
val empHirearchyDF = empHirearchyExtDF.join(empDF , empDF.col("emp_id") === empHirearchyExtDF.col("emp_id_pk")).selectExpr("emp_id","first_name","last_name","title","mgr_id","level","root","path","iscyclic","isleaf")

// print
empHirearchyDF.show()
eubnara commented 4 years ago

EdgeDirection 이 조금 헷갈렸다.

일단 Edge 는 srcId -> dstId 쪽 방향의 edge 이다.

pregel 은 반복적인 알고리즘인데, 이전 라운드에서 어느 vertex 가 메시지를 받았는지를 잘 생각하면서,

그냥 Edge 의 srcId -> dstId 기준으로 자연스레 생각하면 보통의 경우엔 EdgeDirection.Out 일듯하다.

https://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api 에 나온 경우엔 pregel api 호출시 딱히 방향을 넣지 않아 기본값인 EdgeDirection.Either 가 넣어졌을 텐데 그래도 별탈없이 잘 동작한다. 해당 예제가 풀려고 하는 문제는 single source shortest path 이다.

이 경우엔 해당 문제의 특성상 EdgeDirection.Either 가 맞다. (벨만포드 알고리즘과 비슷한 것 같기도 하고?)

어떤 문제를 푸는지에 따라 달라질 것 같다.

https://www.qubole.com/blog/processing-hierarchical-data-using-spark-graphx-pregel-api/ 에 나온 문제를 푸는 건 또 EdgeDirection.Out 이 맞고.

EdgeDirection.BothEdgeDirection.In 이 어울릴 만한 문제는 잘 떠오르지 않는다.