Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Performing lookup/translation in a Spark RDD or data frame using another RDD/df

I'm having a hard time implementing something that seems like it should be very easy:

My goal is to make translations in an RDD/dataframe using a second RDD/dataframe as a lookup table or translation dictionary. I want to make these translations in multiple columns.

The easiest way to explain the problem is by example. Let's say I have as my input the following two RDDs:

Route SourceCityID DestinationCityID
A     1            2
B     1            3
C     2            1

and

CityID CityName
1      London
2      Paris
3      Tokyo

My desired output RDD is:

Route SourceCity DestinationCity
A     London     Paris
B     London     Tokyo
C     Paris      London

How should I go about it producing it?

This is an easy problem in SQL, but I don't know of obvious solutions with RDDs in Spark. The join, cogroup, etc methods seem to not be well-suited to multi-column RDDs and don't allow specifying which column to join on.

Any ideas? Is SQLContext the answer?

like image 593
xenocyon Avatar asked Oct 13 '15 01:10

xenocyon


2 Answers

The rdd way:

routes = sc.parallelize([("A", 1, 2),("B", 1, 3), ("C", 2, 1) ])
cities = sc.parallelize([(1, "London"),(2, "Paris"), (3, "Tokyo")])


print routes.map(lambda x: (x[1], (x[0], x[2]))).join(cities) \
.map(lambda x: (x[1][0][1], (x[1][0][0], x[1][1]))).join(cities). \
map(lambda x: (x[1][0][0], x[1][0][1], x[1][1])).collect()

Which prints:

[('C', 'Paris', 'London'), ('A', 'London', 'Paris'), ('B', 'London', 'Tokyo')]

And the SQLContext way:

from pyspark.sql import HiveContext
from pyspark.sql import SQLContext

df_routes = sqlContext.createDataFrame(\
routes, ["Route", "SourceCityID", "DestinationCityID"])
df_cities = sqlContext.createDataFrame(\
cities, ["CityID", "CityName"])

temp =  df_routes.join(df_cities, df_routes.SourceCityID == df_cities.CityID) \
.select("Route", "DestinationCityID", "CityName")
.withColumnRenamed("CityName", "SourceCity")

print temp.join(df_cities, temp.DestinationCityID == df_cities.CityID) \
.select("Route", "SourceCity", "CityName")
.withColumnRenamed("CityName", "DestinationCity").collect()

Which prints:

[Row(Route=u'C', SourceCity=u'Paris', DestinationCity=u'London'),
Row(Route=u'A', SourceCity=u'London', DestinationCity=u'Paris'),
Row(Route=u'B', SourceCity=u'London', DestinationCity=u'Tokyo')]
like image 187
user3689574 Avatar answered Nov 04 '22 08:11

user3689574


Assuming we have two RDDs with the routes and the cities:

val routes = sc.parallelize(List(("A", 1, 2),("B", 1, 3),("C", 2, 1)))
val citiesByIDRDD = sc.parallelize(List((1, "London"), (2, "Paris"), (3, "Tokyo")))

There are several ways to implement the cities lookup. Assuming that the cities lookup contains few items compared to the routes which would contain many items. In that case let's start with collecting the cities as a map that is sent by the driver to each task.

val citiesByID = citiesByIDRDD.collectAsMap

routes.map{r => (r._1, citiesByID(r._2), citiesByID(r._3))}.collect
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London))

To avoid sending the lookup table to every task, but only once to the workers you can extend the existing code broadcast the lookup map.

val bCitiesByID = sc.broadcast(citiesByID)

routes.map{r => (r._1, bCitiesByID.value(r._2), bCitiesByID.value(r._3))}.collect
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London))

I don't see the need for data frames here, but if you want to, you could:

import sqlContext.implicits._

case class Route(id: String, from: Int, to: Int)
case class City(id: Int, name: String)

val cities = List(City(1, "London"), City(2, "Paris"), City(3, "Tokyo"))
val routes = List(Route("A", 1, 2), Route("B", 1, 3), Route("C", 2, 1))

val citiesDf = cities.df
citiesDf.registerTempTable("cities")
val routesDf = routes.df
citiesDf.registerTempTable("routes")

routesDf.show
+---+----+---+
| id|from| to|
+---+----+---+
|  A|   1|  2|
|  B|   1|  3|
|  C|   2|  1|
+---+----+---+

citiesDf.show
+---+------+
| id|  name|
+---+------+
|  1|London|
|  2| Paris|
|  3| Tokyo|
+---+------+

You mentioned that it is an easy problem in SQL so I assume you can take it from here then. Executing SQL goes like this:

sqlContext.sql ("SELECT COUNT(*) FROM routes")
like image 35
Mariano Kamp Avatar answered Nov 04 '22 08:11

Mariano Kamp