I'm a total beginner with spark / hadoop / graph computation so please excuse my beginner question.
I've created a graph, using graphx. Now, for every vertex, I want to get all its second degree neighbors. so if my graph is:
v1 --> v2
v1 --> v4
v1 --> v6
I want to get something like:
v2 --> v4
v2 --> v6
v4 --> v2
v4 --> v6
v6 --> v2
v6 --> v4
My graph is very large, so I would like to do it elegantly and efficiently as possible.
I have a feeling this should not too hard, but as a total newbee to this huge framework, I find myself all over the docs / source trying to figure this one out.
Does anyone have advice on what will be the best way to do that over a graph instance?
Thank you!
From your example, I take your problem to be that you want to construct a graph that has an edge a -> b if an only if in the original graph there was a vertex v and edges v -> a and v -> b. (Which may or may not be a standard definition of "second degree neighbor" in a directed graph, but interesting anyway.)
Here's a solution in Scala. It produces a graph with all the original vertices but only the required edges. I'm putting empty strings as data on all the vertices and edges.
Assuming a SparkContext sc as you'd normally get in the Spark shell, and GraphX available, set up the example graph:
val vertices: RDD[(VertexId, String)] =
sc.parallelize(Array((1L,""), (2L,""), (4L,""), (6L,"")))
val edges: RDD[Edge[String]] =
sc.parallelize(Array(Edge(1L, 2L, ""), Edge(1L, 4L, ""), Edge(1L, 6L, "")))
val inputGraph = Graph(vertices, edges)
Crate an alternate set of vertices each annotated with the set of their successors, so, in your example, v1 would be annotated with {v2, v4, v6}.
val verticesWithSuccessors: VertexRDD[Array[VertexId]] =
inputGraph.ops.collectNeighborIds(EdgeDirection.Out)
Create a new graph using these vertices and the original edges.
val successorSetGraph = Graph(verticesWithSuccessors, edges)
Now we need to push these sets along each edge, creating yet another set of vertices, this time all annotated with their neighbors. We need to combine the sets at the destinations vertex, so using Scala Set
removes duplicates. We also need to remove each vertex from the set of its neighbors, hence the extra map
tacked on the end.
val ngVertices: VertexRDD[Set[VertexId]] =
successorSetGraph.mapReduceTriplets[Set[VertexId]] (
triplet => {
Iterator((triplet.dstId, triplet.srcAttr.toSet))
},
(s1, s2) => s1 ++ s2
).mapValues[Set[VertexId]](
(id: VertexId, neighbors: Set[VertexId]) => neighbors - id
)
Now we're almost ready to create the final graph, but we need an edge for each neighbor relationship:
val ngEdges = ngVertices.flatMap[Edge[String]](
{
case (source: VertexId, allDests: Set[VertexId]) => {
allDests.map((dest: VertexId) => Edge(source, dest, ""))
}
}
)
Now we can put it all together:
val neighborGraph = Graph(vertices, ngEdges)
I'm sure an expert can do better, especially in terms of performance, but most of the ideas this relies on can be found in the GraphX Programming Guide.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With