Some context can be found here, the idea is that I have created a graph from tuples collected from a request on a Hive table. Those correspond to trade relations between countries. Having built the graph this way, the vertices are not labelled. I want to study the distribution of degrees and get the most connected countries' names. I tried 2 options :
In both cases I get the following error : the task is not serializable
import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect()
val data_2010 = data.filter(line => line(0)==2010)
val couples = data_2010.map(line=>(line(2),line(3))) //pays->pays
couples look like this: Array[(Any, Any)] = Array((MWI,MOZ), (WSM,AUS), (MDA,CRI), (KNA,HTI), (PER,ERI), (SWE,CUB),...
val idMap = sc.broadcast(couples
.flatMap{case (x: String, y: String) => Seq(x, y)}
.distinct
.zipWithIndex
.map{case (k, v) => (k, v.toLong)}
.toMap)
val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples
.map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))})
val graph = Graph.fromEdgeTuples(edges, 1)
built this way, vertices look like (68,1) for example
val degrees: VertexRDD[Int] = graph.degrees.cache()
//Most connected vertices
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(Int, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
(id, degree, k) => (id.toInt, degree)}
val ord = Ordering.by[(Int, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)
We get : (79,1016),(64,912),(55,889)...
val idMapbis = sc.parallelize(couples
.flatMap{case (x: String, y: String) => Seq(x, y)}
.distinct
.zipWithIndex
.map{case (k, v) => (v,k)}
.toMap)
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
(id, degree, name) => (idMapbis.value(id.toInt), degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)
The task is not serializable but the function idMapbis is working since there is no error with idMapbis.value(graph.vertices.take(1)(0)._1.toInt)
graph.vertices.map{case (k, v) => (k,idMapbis.value(k.toInt))}
The task is not serializable again (for context here is how topNamesAndDegrees is modified to obtain the names of the most connected vertices in this option)
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
(id, degree, name) => (name, degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)
I am interested in understanding how to improve one of this option, maybe both if someone see how.
Problem with your attempts is that idMapbis
is an RDD
. Since we already know your data fits into memory you can simply use a broadcast variable as before:
val idMapRev = sc.broadcast(idMap.value.map{case (k, v) => (v, k)}.toMap)
graph.mapVertices{case (id, _) => idMapRev.value(id)}
Alternatively you could use the correct labels from the beginning:
val countries: RDD[(VertexId, String)] = sc
.parallelize(idMap.value.map(_.swap).toSeq)
val relationships: RDD[Edge[Int]] = sc.parallelize(couples
.map{case (x: String, y: String) => Edge(idMap.value(x), idMap.value(y), 1)}
)
val graph = Graph(countries, relationships)
The second approach has one important advantage - if graph is large you relatively easily replace broadcast variables with joins.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With