I have created a GraphFrame in Spark and the graph currently looks as following:
Basically, there will be lot of such subgraphs where each of these subgraphs will be disconnected to each other. Given a particular node ID I want to find all the other nodes within the subgraph. For instance, if the node ID 1 is given then the graph will traverse and return 2,10,20,3,30.
I have created a motif but it doesn't give the right result.
testgraph.find("(a)-[]->(b); (c)-[]->(b)").filter("(a.id = '1')").show()
Unfortunately the connected component function consider the whole graph. Is it possible to get all the nodes within a disconnected subgraph given a particular node ID using GraphFrame/GraphX?
GraphX makes it easier to run analytics on graph data with the built-in operators and algorithms. It also allows us to cache and uncache the graph data to avoid recomputation when we need to call a graph multiple times.
GraphX unifies ETL, exploratory analysis, and iterative graph computation within a single system. You can view the same data as both graphs and collections, transform and join graphs with RDDs efficiently, and write custom iterative graph algorithms using the Pregel API. graph = Graph(vertices, edges)
The Pregel operator terminates iteration and returns the final graph when there are no messages remaining. Note, unlike more standard Pregel implementations, vertices in GraphX can only send messages to neighboring vertices and the message construction is done in parallel using a user defined messaging function.
What is Spark GraphX? GraphX is the Spark API for graphs and graph-parallel computation. It includes a growing collection of graph algorithms and builders to simplify graph analytics tasks. GraphX extends the Spark RDD with a Resilient Distributed Property Graph.
Getting the connected component related to a specific vertex can be done using a BFS traversal that starts from this vertex and collects all its neighbors on several hops. This can be simply done through the Pregel API offered by GraphX, where we should implement a vertexProgram, sendMessage and mergeMessages functions. The algorithm is triggered on the reception of an initial message. The center sends a message to its neighbors that will propagate it to their neighbors and so on till covering the connected component. Every vertex that receives a msg is checked so that it won't be activated in the following iterations.
Here is the implementation of this approach:
import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}
object ConnectedComponent extends Serializable {
def main(args = Array[String]) = {
val conf = new SparkConf().setAppName("ConnectedComponent").setMaster("local")
val sc = new SparkContext(conf)
val vRDD = sc.objectFile[(VertexId,Int)]("/path/to/vertex/rdd/file/")
val eRDD = sc.objectFile[Edge[Int]]("/path/to/edge/rdd/file/")
val graph = Graph(vRDD, eRDD)
val centerOfCC = graph.pickRandomVertex()
var cc = extractCC(graph, center)
cc.vertices.collect.foreach(println)
sc.stop()
}
def extractCC(g: Graph[Int, Int], center: VertexId): Graph[Int, Int] = {
/* Return a subgraph of the input graph containing 'center' with the connected component
*/
val initialGraph = g.mapVertices((id, attr) => VertexData(attr, false, false, center))
val connectedComponent = initialGraph.pregel(initialMsg = 0)(vprog, sendMsg, mergeMsgs)
.subgraph(vpred = (id, attr) => attr.checked == true)
.mapVertices((id, vdata) => vdata.attr)
connectedComponent
}
case class VertexData( var attr : Int, // label of the vertex
var checked : Boolean, // check visited vertices
var propagate : Boolean, // allow forwarding msgs or not
var center: VertexId) // ID of the connectedComponent center
def vprog(id:VertexId, vdata: VertexData, msg: Int): VertexData = {
val attr : Int = vdata.attr
var checked : Boolean = vdata.checked
var propagate : Boolean = vdata.propagate
val center : VertexId = vdata.center
if (checked==false && msg == 0 && id==center) {
propagate = true
checked = true
}
else if(checked==false && msg == 1) {
propagate = true
checked = true
}
else if(checked == true && msg == 1){
propagate = false
}
new VertexData(attr, checked, propagate, center)
}
def sendMsg(triplet: EdgeTriplet[VertexData, Int]):Iterator[(VertexId, Int)] = {
var it : Iterator[(VertexId, Int)] = Iterator()
if(triplet.dstAttr.propagate==true)
it = it ++ Iterator((triplet.srcId, 1))
if(triplet.srcAttr.propagate==true)
it = it ++ Iterator((triplet.dstId, 1))
it
}
def mergeMsgs(a: Int, b: Int): Int = math.max(a, b)
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With