Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to group by multiple keys in spark?

I have a bunch of tuples which are in form of composite keys and values. For example,

tfile.collect() = [(('id1','pd1','t1'),5.0), 
     (('id2','pd2','t2'),6.0),
     (('id1','pd1','t2'),7.5),
     (('id1','pd1','t3'),8.1)  ]

I want to perform sql like operations on this collection, where I can aggregate the information based on id[1..n] or pd[1..n] . I want to implement using the vanilla pyspark apis and not using SQLContext. In my current implementation I am reading from a bunch of files and merging the RDD.

def readfile():
    fr = range(6,23)
    tfile = sc.union([sc.textFile(basepath+str(f)+".txt")
                        .map(lambda view: set_feature(view,f)) 
                        .reduceByKey(lambda a, b: a+b)
                        for f in fr])
    return tfile

I intend to create an aggregated array as a value. For example,

agg_tfile = [((id1,pd1),[5.0,7.5,8.1])]

where 5.0,7.5,8.1 represent [t1,t2,t3] . I am currently, achieving the same by vanilla python code using dictionaries. It works fine for smaller data sets. But I worry as this may not scale for larger data sets. Is there an efficient way achieving the same using pyspark apis ?

like image 976
Rahul Avatar asked Mar 31 '15 15:03

Rahul


People also ask

How do I use group by key in Spark?

The GroupByKey function in apache spark is defined as the frequently used transformation operation that shuffles the data. The GroupByKey function receives key-value pairs or (K, V) as its input and group the values based on the key, and finally, it generates a dataset of (K, Iterable) pairs as its output.

How do you do multiple groupBy in PySpark?

Grouping on Multiple Columns in PySpark can be performed by passing two or more columns to the groupBy() method, this returns a pyspark. sql. GroupedData object which contains agg(), sum(), count(), min(), max(), avg() e.t.c to perform aggregations.

How does groupBy work in Spark?

The groupBy method is defined in the Dataset class. groupBy returns a RelationalGroupedDataset object where the agg() method is defined. Spark makes great use of object oriented programming! The RelationalGroupedDataset class also defines a sum() method that can be used to get the same result with less code.

How do you group by RDD?

Solution: We can apply the “groupByKey” / “reduceByKey” transformations on (key,val) pair RDD. The “groupByKey” will group the values for each key in the original RDD. It will create a new pair, where the original key corresponds to this collected group of values.


2 Answers

My guess is that you want to transpose the data according to multiple fields.

A simple way is to concatenate the target fields that you will group by, and make it a key in a paired RDD. For example:

lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1'])
rdd = lines.map(lambda x: x.split(',')).map(lambda x: (x[0] + ', ' + x[1], x[3])).reduceByKey(lambda a, b: a + ', ' + b)
print rdd.collect()

Then you will get the transposed result.

[('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')]
like image 133
dapangmao Avatar answered Sep 21 '22 16:09

dapangmao


I grouped ((id1,t1),((p1,5.0),(p2,6.0)) and so on ... as my map function. Later, I reduce using map_group which creates an array for [p1,p2, . . . ] and fills in values in their respective positions.

def map_group(pgroup):
    x = np.zeros(19)
    x[0] = 1
    value_list = pgroup[1]
    for val in value_list:
        fno = val[0].split('.')[0]
        x[int(fno)-5] = val[1]
    return x

tgbr = tfile.map(lambda d: ((d[0][0],d[0][2]),[(d[0][1],d[1])])) \
                .reduceByKey(lambda p,q:p+q) \
                .map(lambda d: (d[0], map_group(d)))

This does feel like an expensive solution in terms of computation. But works for now.

like image 35
Rahul Avatar answered Sep 18 '22 16:09

Rahul