Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to get two-hop neighbors in spark-graphx?

I've created a directed graph, using graphx.

#src->dest
a  -> b  34
a  -> c  23
b  -> e  10
c  -> d  12
d  -> c  12
c  -> d  11

I want to get all two hop neighbors like this:

a  -> e  44
a  -> d  34

My graph is very large, so I would like to do it elegantly and efficiently. Does anyone have any advice on what will be the best way to do that over a graph instance?

like image 376
leslie chu Avatar asked Oct 08 '16 04:10

leslie chu


People also ask

What is unique feature of GraphX?

Speed is one of the best features of GraphX. It provides comparable performance to the fastest specialized graph processing systems. It is fastest on comparing with the other graph systems. Even while retaining Spark's flexibility, fault tolerance and ease of use.

What is a triplet in GraphX?

concept triplet in category graphx You can also use the triplets() method to join together the vertices and edges based on VertexId . Although Graph natively stores its data as separate edge and vertex RDDs, triplets() is a convenience function that joins them together for you, as shown in the following listing.

Is GraphX available in Pyspark?

GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge.

What is the purpose of the GraphX library?

GraphX unifies ETL, exploratory analysis, and iterative graph computation within a single system. You can view the same data as both graphs and collections, transform and join graphs with RDDs efficiently, and write custom iterative graph algorithms using the Pregel API.


1 Answers

You can succinctly express this using GraphFrames library. First you have to include required package. For with Spark 2.0 and Scala 2.11 you can add

graphframes:graphframes:0.2.0-spark2.0-s_2.11

to spark.jars.packages in conf/spark-defaults.conf or pass it as --packages argument for spark-submit.

Next you should convert Graph to GraphFrame. You can use fromGraphX method:

import org.graphframes.GraphFrame
import org.apache.spark.graphx._

val nodes = sc.parallelize(Seq(
  (1L, "a"), (2L, "b"), (3L, "c"), (4L, "d"), (5L, "e")))

val edges = sc.parallelize(Seq(
   Edge(1L, 2L, 34), Edge(1L, 3L, 23), Edge(2L, 5L, 10),
   Edge(3L, 4L, 12), Edge(3L, 3L, 12), Edge(3L, 5L, 11)))

val graph = Graph(nodes, edges)

val graphFrame = GraphFrame.fromGraphX(graph)

GraphFrame provides find method which takes a pattern in a language similar to Cypher. Two-hops can be expressed as:

val pattern = "(x1) - [a] -> (x2); (x2) - [b] -> (x3)"

where (_) represents nodes, and [_] edges. You paths matching the pattern:

val paths = graphFrame.find(pattern)

and select fields:

paths.select($"x1.attr", $"x3.attr", $"a.attr" + $"b.attr").show()
like image 132
zero323 Avatar answered Sep 22 '22 17:09

zero323