Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get SSSP actual path by apache spark graphX?

I have ran the single source shortest path (SSSP) example on spark site as follows:

graphx-SSSP pregel example

Code(scala):

object Pregel_SSSP {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Allen Pregel Test", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
// A graph with edge attributes containing distances
val graph: Graph[Int, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 5).mapEdges(e => e.attr.toDouble)
graph.edges.foreach(println)
val sourceId: VertexId = 0 // The ultimate source

// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)

val sssp = initialGraph.pregel(Double.PositiveInfinity, Int.MaxValue, EdgeDirection.Out)(

  // Vertex Program
  (id, dist, newDist) => math.min(dist, newDist),

  // Send Message
  triplet => {
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  //Merge Message
  (a, b) => math.min(a, b))
println(sssp.vertices.collect.mkString("\n"))
 } 
}

sourceId: 0
Get the result:
(0,0.0)
(4,2.0)
(2,1.0)
(3,1.0)
(1,2.0)

But I need actual path like as follows:
=>
0 -> 0,0
0 -> 2,1
0 -> 3,1
0 -> 2 -> 4,2
0 -> 3 -> 1,2

How to get SSSP actual path by spark graphX?
anybody give me some hint?
Thanks for your help!

like image 723
AllenChen Avatar asked May 16 '14 16:05

AllenChen


People also ask

How the command Pregel works in GraphX?

The Pregel operator terminates iteration and returns the final graph when there are no messages remaining. Note, unlike more standard Pregel implementations, vertices in GraphX can only send messages to neighboring vertices and the message construction is done in parallel using a user defined messaging function.

What is Apache GraphX?

GraphX unifies ETL, exploratory analysis, and iterative graph computation within a single system. You can view the same data as both graphs and collections, transform and join graphs with RDDs efficiently, and write custom iterative graph algorithms using the Pregel API.

What is unique feature of GraphX?

GraphX offers flexibility and works seamlessly with both graphs and collections. Hence, you can view the same data as graphs or collections. It unifies ETL, exploratory analysis, and iterative graph framework computation within a single system. It incorporates Spark data processing pipelines with graph processing.

What are the analytic algorithms provided in Apache spark GraphX?

We can choose from a growing library of graph algorithms that Spark GraphX has to offer. Some of the popular algorithms are page rank, connected components, label propagation, SVD++, strongly connected components and triangle count.


1 Answers

You have to modify algorithm in order to store not only shortest path length but also actual path. So instead of storing Double as property of vertex you should store tuple: (Double, List[VertexId]) Maybe this code can be useful for you.

object Pregel_SSSP {
  def main(args: Array[String]) {
    val sc = new SparkContext("local", "Allen Pregel Test", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
    // A graph with edge attributes containing distances
    val graph: Graph[Int, Double] =
      GraphGenerators.logNormalGraph(sc, numVertices = 5).mapEdges(e => e.attr.toDouble)
    graph.edges.foreach(println)
    val sourceId: VertexId = 0 // The ultimate source

    // Initialize the graph such that all vertices except the root have distance infinity.
    val initialGraph : Graph[(Double, List[VertexId]), Double] = graph.mapVertices((id, _) => if (id == sourceId) (0.0, List[VertexId](sourceId)) else (Double.PositiveInfinity, List[VertexId]()))

    val sssp = initialGraph.pregel((Double.PositiveInfinity, List[VertexId]()), Int.MaxValue, EdgeDirection.Out)(

      // Vertex Program
      (id, dist, newDist) => if (dist._1 < newDist._1) dist else newDist, 

      // Send Message
      triplet => {
        if (triplet.srcAttr._1 < triplet.dstAttr._1 - triplet.attr ) {
          Iterator((triplet.dstId, (triplet.srcAttr._1 + triplet.attr , triplet.srcAttr._2 :+ triplet.dstId)))
        } else {
          Iterator.empty
        }
      },
      //Merge Message
      (a, b) => if (a._1 < b._1) a else b)
    println(sssp.vertices.collect.mkString("\n"))
  }
}
like image 119
Hlib Avatar answered Sep 19 '22 17:09

Hlib