Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to transform RDD[(Key, Value)] into Map[Key, RDD[Value]]

I searched a solution for a long time but didn't get any correct algorithm.

Using Spark RDDs in scala, how could I transform a RDD[(Key, Value)] into a Map[key, RDD[Value]], knowing that I can't use collect or other methods which may load data into memory ?

In fact, my final goal is to loop on Map[Key, RDD[Value]] by key and call saveAsNewAPIHadoopFile for each RDD[Value]

For example, if I get :

RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)]

I'd like :

Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])]

I wonder if it would cost not too much to do it using filter on each key A, B, C of RDD[(Key, Value)], but I don't know if calling filter as much times there are different keys would be efficient ? (off course not, but maybe using cache ?)

Thank you

like image 996
Seb Avatar asked Jan 23 '15 13:01

Seb


People also ask

What is key-value RDD?

Spark Paired RDDs are nothing but RDDs containing a key-value pair. Basically, key-value pair (KVP) consists of a two linked data item in it. Here, the key is the identifier, whereas value is the data corresponding to the key value. Moreover, Spark operations work on RDDs containing any type of objects.

Which transformation RDD is used to count the value using key?

We can also use “reduceByKey” transformation for counting the frequencies of each word in (key,value) pair RDD. Lets see how will we do this. If we compare the result of both ( “groupByKey” and “reduceByKey”) transformations, we have got the same results.

Which transformation returns only the keys of a pair RDD?

mapValues (transformation) It applies the given function to only the values in a Pair RDD i.e. transforms RDD[(K, V)] to RDD[(K, U)] .


1 Answers

You should use the code like this (Python):

rdd = sc.parallelize( [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)] ).cache()
keys = rdd.keys().distinct().collect()
for key in keys:
    out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y)
    out.saveAsNewAPIHadoopFile (...)

One RDD cannot be a part of another RDD and you have no option to just collect keys and transform their related values to a separate RDD. In my example you would iterate over the cached RDD which is ok and would work fast

like image 168
0x0FFF Avatar answered Nov 01 '22 16:11

0x0FFF