Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a graph from Array[(Any, Any)] using Graph.fromEdgeTuples

I am very new to spark but I want to create a graph from relations that I get from a Hive table. I found a function that is supposed to allow this without defining the vertices but I can't get it to work.

I know this isn't a reproducible example but here is my code :

import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect()
val data_2010 = data.filter(line => line(0)==2010)
val couples = data_2010.map(line=>(line(2),line(3)) //country to country 

val graph = Graph.fromEdgeTuples(couples, 1)

The last line generates the following error :

val graph = Graph.fromEdgeTuples(sc.parallelize(couples), 1)
<console>:31: error: type mismatch;
found   : Array[(Any, Any)]
required: Seq[(org.apache.spark.graphx.VertexId,org.apache.spark.graphx.VertexId)]
Error occurred in an application involving default arguments.
val graph = Graph.fromEdgeTuples(sc.parallelize(couples), 1)

couples look like this :

couples: Array[(Any, Any)] = Array((MWI,MOZ), (WSM,AUS), (MDA,CRI), (KNA,HTI), (PER,ERI), (SWE,CUB), (DEU,PRK), (THA,DJI), (BIH,SVK), (RUS,THA), (SGP,BLR), (MEX,TGO), (TUR,ZAF), (ZWE,SYC), (UGA,GHA), (OMN,SVN), (NZL,SYR), (CHE,SLV), (CZE,LUX), (TGO,COM), (TTO,WLF), (NGA,PAN), (FJI,UKR), (BRA,ECU), (EGY,SWE), (ITA,ARG), (MUS,MLT), (MDG,DZA), (ARE,SUR), (CAN,GUY), (OMN,COG), (NAM,FIN), (ITA,HMD), (SWE,CHE), (SDN,NER), (TUN,USA), (THA,GMB), (HUN,TTO), (FRA,BEN), (NER,TCD), (CHN,JPN), (DNK,ZAF), (MLT,UKR), (ARM,OMN), (PRT,IDN), (BEN,PER), (TTO,BRA), (KAZ,SMR), (CPV,""), (ARG,ZAF), (BLR,TJK), (AZE,SVK), (ITA,STP), (MDA,IRL), (POL,SVN), (PRY,ETH), (HKG,MOZ), (QAT,GAB), (THA,MUS), (PHL,MOZ), (ITA,SGS), (ARM,KHM), (ARG,KOR), (AUT,GMB), (SYR,COM), (CZE,GBR), (DOM,USA), (CYP,LAO), (USA,LBR)

How can I convert to the suitable format ?

like image 321
Stéphanie C Avatar asked Aug 10 '15 20:08

Stéphanie C


People also ask

How to create a graph in Java using ArrayList?

A most common way to create a graph is by using one of the representations of graphs like adjacency matrix or adjacency list. We will discuss these representations next and then implement the graph in Java using the adjacency list for which we will use ArrayList. Graph Representation In Java

How do you represent a graph in an array?

Graph and its representations. Size of the array is equal to the number of vertices. Let the array be array []. An entry array [i] represents the list of vertices adjacent to the i th vertex. This representation can also be used to represent a weighted graph. The weights of edges can be represented as lists of pairs.

How to implement graph data structure in Java?

Java does not provide a full-fledged implementation of the graph data structure. However, we can represent the graph programmatically using Collections in Java. We can also implement a graph using dynamic arrays like vectors. Usually, we implement graphs in Java using HashMap collection.

How to connect array nodes in a graph?

Nodes in the graph should be connected by the from and to attributes of each item in the array. {id:1, from:a, to:b} --> {id:2, from:b,to:c} --> {id:3, from:c, to:a}


1 Answers

First of all you cannot use String as a VertexId so you have to map labels to Long. Then, we need to prepare a mapping from label to id. As long as the number of unique values is relatively small, the simplest approach is to create a broadcast variable:

val idMap = sc.broadcast(couples // -> Array[(Any, Any)]
  // Make sure we use String not Any returned from Row.apply
  // And convert to Seq so we can flatten results
  .flatMap{case (x: String, y: String) => Seq(x, y)} // -> Array[String]
  // Get different keys
  .distinct // -> Array[String]
  // Create (key, value) pairs
  .zipWithIndex  // -> Array[(String, Int)]
  // Convert values to Long so we can use it as a VertexId
  .map{case (k, v) => (k, v.toLong)}  // -> Array[(String, Long)]
  // Create map
  .toMap) // -> Map[String,Long]

Next we can use the above to perform mapping:

val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples
  .map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))}
)

Finally we get a graph:

val graph = Graph.fromEdgeTuples(edges, 1)
like image 143
zero323 Avatar answered Sep 20 '22 17:09

zero323