I've created a directed graph, using graphx.
#src->dest
a -> b 34
a -> c 23
b -> e 10
c -> d 12
d -> c 12
c -> d 11
I want to get all two hop neighbors like this:
a -> e 44
a -> d 34
My graph is very large, so I would like to do it elegantly and efficiently. Does anyone have any advice on what will be the best way to do that over a graph instance?
Speed is one of the best features of GraphX. It provides comparable performance to the fastest specialized graph processing systems. It is fastest on comparing with the other graph systems. Even while retaining Spark's flexibility, fault tolerance and ease of use.
concept triplet in category graphx You can also use the triplets() method to join together the vertices and edges based on VertexId . Although Graph natively stores its data as separate edge and vertex RDDs, triplets() is a convenience function that joins them together for you, as shown in the following listing.
GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge.
GraphX unifies ETL, exploratory analysis, and iterative graph computation within a single system. You can view the same data as both graphs and collections, transform and join graphs with RDDs efficiently, and write custom iterative graph algorithms using the Pregel API.
You can succinctly express this using GraphFrames library. First you have to include required package. For with Spark 2.0 and Scala 2.11 you can add
graphframes:graphframes:0.2.0-spark2.0-s_2.11
to spark.jars.packages
in conf/spark-defaults.conf
or pass it as --packages
argument for spark-submit
.
Next you should convert Graph
to GraphFrame
. You can use fromGraphX
method:
import org.graphframes.GraphFrame
import org.apache.spark.graphx._
val nodes = sc.parallelize(Seq(
(1L, "a"), (2L, "b"), (3L, "c"), (4L, "d"), (5L, "e")))
val edges = sc.parallelize(Seq(
Edge(1L, 2L, 34), Edge(1L, 3L, 23), Edge(2L, 5L, 10),
Edge(3L, 4L, 12), Edge(3L, 3L, 12), Edge(3L, 5L, 11)))
val graph = Graph(nodes, edges)
val graphFrame = GraphFrame.fromGraphX(graph)
GraphFrame
provides find method which takes a pattern in a language similar to Cypher. Two-hops can be expressed as:
val pattern = "(x1) - [a] -> (x2); (x2) - [b] -> (x3)"
where (_)
represents nodes, and [_]
edges. You paths matching the pattern:
val paths = graphFrame.find(pattern)
and select
fields:
paths.select($"x1.attr", $"x3.attr", $"a.attr" + $"b.attr").show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With