I'm having a hard time implementing something that seems like it should be very easy:
My goal is to make translations in an RDD/dataframe using a second RDD/dataframe as a lookup table or translation dictionary. I want to make these translations in multiple columns.
The easiest way to explain the problem is by example. Let's say I have as my input the following two RDDs:
Route SourceCityID DestinationCityID
A 1 2
B 1 3
C 2 1
and
CityID CityName
1 London
2 Paris
3 Tokyo
My desired output RDD is:
Route SourceCity DestinationCity
A London Paris
B London Tokyo
C Paris London
How should I go about it producing it?
This is an easy problem in SQL, but I don't know of obvious solutions with RDDs in Spark. The join, cogroup, etc methods seem to not be well-suited to multi-column RDDs and don't allow specifying which column to join on.
Any ideas? Is SQLContext the answer?
The rdd way:
routes = sc.parallelize([("A", 1, 2),("B", 1, 3), ("C", 2, 1) ])
cities = sc.parallelize([(1, "London"),(2, "Paris"), (3, "Tokyo")])
print routes.map(lambda x: (x[1], (x[0], x[2]))).join(cities) \
.map(lambda x: (x[1][0][1], (x[1][0][0], x[1][1]))).join(cities). \
map(lambda x: (x[1][0][0], x[1][0][1], x[1][1])).collect()
Which prints:
[('C', 'Paris', 'London'), ('A', 'London', 'Paris'), ('B', 'London', 'Tokyo')]
And the SQLContext way:
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
df_routes = sqlContext.createDataFrame(\
routes, ["Route", "SourceCityID", "DestinationCityID"])
df_cities = sqlContext.createDataFrame(\
cities, ["CityID", "CityName"])
temp = df_routes.join(df_cities, df_routes.SourceCityID == df_cities.CityID) \
.select("Route", "DestinationCityID", "CityName")
.withColumnRenamed("CityName", "SourceCity")
print temp.join(df_cities, temp.DestinationCityID == df_cities.CityID) \
.select("Route", "SourceCity", "CityName")
.withColumnRenamed("CityName", "DestinationCity").collect()
Which prints:
[Row(Route=u'C', SourceCity=u'Paris', DestinationCity=u'London'),
Row(Route=u'A', SourceCity=u'London', DestinationCity=u'Paris'),
Row(Route=u'B', SourceCity=u'London', DestinationCity=u'Tokyo')]
Assuming we have two RDDs with the routes and the cities:
val routes = sc.parallelize(List(("A", 1, 2),("B", 1, 3),("C", 2, 1)))
val citiesByIDRDD = sc.parallelize(List((1, "London"), (2, "Paris"), (3, "Tokyo")))
There are several ways to implement the cities lookup. Assuming that the cities lookup contains few items compared to the routes which would contain many items. In that case let's start with collecting the cities as a map that is sent by the driver to each task.
val citiesByID = citiesByIDRDD.collectAsMap
routes.map{r => (r._1, citiesByID(r._2), citiesByID(r._3))}.collect
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London))
To avoid sending the lookup table to every task, but only once to the workers you can extend the existing code broadcast the lookup map.
val bCitiesByID = sc.broadcast(citiesByID)
routes.map{r => (r._1, bCitiesByID.value(r._2), bCitiesByID.value(r._3))}.collect
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London))
I don't see the need for data frames here, but if you want to, you could:
import sqlContext.implicits._
case class Route(id: String, from: Int, to: Int)
case class City(id: Int, name: String)
val cities = List(City(1, "London"), City(2, "Paris"), City(3, "Tokyo"))
val routes = List(Route("A", 1, 2), Route("B", 1, 3), Route("C", 2, 1))
val citiesDf = cities.df
citiesDf.registerTempTable("cities")
val routesDf = routes.df
citiesDf.registerTempTable("routes")
routesDf.show
+---+----+---+
| id|from| to|
+---+----+---+
| A| 1| 2|
| B| 1| 3|
| C| 2| 1|
+---+----+---+
citiesDf.show
+---+------+
| id| name|
+---+------+
| 1|London|
| 2| Paris|
| 3| Tokyo|
+---+------+
You mentioned that it is an easy problem in SQL so I assume you can take it from here then. Executing SQL goes like this:
sqlContext.sql ("SELECT COUNT(*) FROM routes")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With