Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Finding connected components of a particular node instead of the whole graph (GraphFrame/GraphX)

I have created a GraphFrame in Spark and the graph currently looks as following:

Basically, there will be lot of such subgraphs where each of these subgraphs will be disconnected to each other. Given a particular node ID I want to find all the other nodes within the subgraph. For instance, if the node ID 1 is given then the graph will traverse and return 2,10,20,3,30.

I have created a motif but it doesn't give the right result.

testgraph.find("(a)-[]->(b); (c)-[]->(b)").filter("(a.id = '1')").show()

Unfortunately the connected component function consider the whole graph. Is it possible to get all the nodes within a disconnected subgraph given a particular node ID using GraphFrame/GraphX?

like image 589
sjishan Avatar asked May 26 '16 14:05

sjishan


People also ask

What are the benefits of using GraphX algorithm over a dataset?

GraphX makes it easier to run analytics on graph data with the built-in operators and algorithms. It also allows us to cache and uncache the graph data to avoid recomputation when we need to call a graph multiple times.

What is GraphX in big data?

GraphX unifies ETL, exploratory analysis, and iterative graph computation within a single system. You can view the same data as both graphs and collections, transform and join graphs with RDDs efficiently, and write custom iterative graph algorithms using the Pregel API. graph = Graph(vertices, edges)

How the command Pregel works in GraphX?

The Pregel operator terminates iteration and returns the final graph when there are no messages remaining. Note, unlike more standard Pregel implementations, vertices in GraphX can only send messages to neighboring vertices and the message construction is done in parallel using a user defined messaging function.

What is spark GraphX and explain graph analytics algorithms?

What is Spark GraphX? GraphX is the Spark API for graphs and graph-parallel computation. It includes a growing collection of graph algorithms and builders to simplify graph analytics tasks. GraphX extends the Spark RDD with a Resilient Distributed Property Graph.


1 Answers

Getting the connected component related to a specific vertex can be done using a BFS traversal that starts from this vertex and collects all its neighbors on several hops. This can be simply done through the Pregel API offered by GraphX, where we should implement a vertexProgram, sendMessage and mergeMessages functions. The algorithm is triggered on the reception of an initial message. The center sends a message to its neighbors that will propagate it to their neighbors and so on till covering the connected component. Every vertex that receives a msg is checked so that it won't be activated in the following iterations.

Here is the implementation of this approach:

import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}

object ConnectedComponent extends  Serializable {

    def main(args = Array[String]) = {
        
        val conf = new SparkConf().setAppName("ConnectedComponent").setMaster("local")
        val sc = new SparkContext(conf)
        val vRDD = sc.objectFile[(VertexId,Int)]("/path/to/vertex/rdd/file/")
        val eRDD = sc.objectFile[Edge[Int]]("/path/to/edge/rdd/file/")
        val graph = Graph(vRDD, eRDD)
        val centerOfCC = graph.pickRandomVertex()
        var cc = extractCC(graph, center)
        cc.vertices.collect.foreach(println)

        sc.stop()
    }

    def extractCC(g: Graph[Int, Int], center: VertexId): Graph[Int, Int] = {
        /* Return a subgraph of the input graph containing 'center'  with the connected component
         */
        val initialGraph = g.mapVertices((id, attr) => VertexData(attr, false, false, center))
        val connectedComponent = initialGraph.pregel(initialMsg = 0)(vprog, sendMsg, mergeMsgs)
                                .subgraph(vpred = (id, attr) => attr.checked == true)
                                .mapVertices((id, vdata) => vdata.attr)
        connectedComponent
    }


    case class VertexData( var attr : Int, // label of the vertex
                    var checked : Boolean, // check visited vertices 
                    var propagate : Boolean, // allow forwarding msgs or not
                    var center: VertexId) // ID of the connectedComponent center
    def vprog(id:VertexId, vdata: VertexData, msg: Int): VertexData = {

        val attr : Int = vdata.attr 
        var checked : Boolean = vdata.checked
        var propagate : Boolean = vdata.propagate
        val center : VertexId = vdata.center

        if (checked==false && msg == 0 && id==center) {
          propagate = true
          checked = true
        }
        else if(checked==false && msg == 1) {
          propagate = true
          checked = true
        }
        else if(checked == true && msg == 1){
          propagate = false
        }
        new VertexData(attr, checked, propagate, center)
    }

    def sendMsg(triplet: EdgeTriplet[VertexData, Int]):Iterator[(VertexId, Int)] = {
        var it : Iterator[(VertexId, Int)] = Iterator()
        if(triplet.dstAttr.propagate==true)
          it = it ++ Iterator((triplet.srcId, 1))
        if(triplet.srcAttr.propagate==true)
          it = it ++ Iterator((triplet.dstId, 1))
        it
    }

    def mergeMsgs(a: Int, b: Int): Int = math.max(a, b)
}
like image 171
PhiloJunkie Avatar answered Sep 23 '22 17:09

PhiloJunkie