I have a bunch of tuples which are in form of composite keys and values. For example,
tfile.collect() = [(('id1','pd1','t1'),5.0),
(('id2','pd2','t2'),6.0),
(('id1','pd1','t2'),7.5),
(('id1','pd1','t3'),8.1) ]
I want to perform sql like operations on this collection, where I can aggregate the information based on id[1..n] or pd[1..n] . I want to implement using the vanilla pyspark apis and not using SQLContext. In my current implementation I am reading from a bunch of files and merging the RDD.
def readfile():
fr = range(6,23)
tfile = sc.union([sc.textFile(basepath+str(f)+".txt")
.map(lambda view: set_feature(view,f))
.reduceByKey(lambda a, b: a+b)
for f in fr])
return tfile
I intend to create an aggregated array as a value. For example,
agg_tfile = [((id1,pd1),[5.0,7.5,8.1])]
where 5.0,7.5,8.1 represent [t1,t2,t3] . I am currently, achieving the same by vanilla python code using dictionaries. It works fine for smaller data sets. But I worry as this may not scale for larger data sets. Is there an efficient way achieving the same using pyspark apis ?
The GroupByKey function in apache spark is defined as the frequently used transformation operation that shuffles the data. The GroupByKey function receives key-value pairs or (K, V) as its input and group the values based on the key, and finally, it generates a dataset of (K, Iterable) pairs as its output.
Grouping on Multiple Columns in PySpark can be performed by passing two or more columns to the groupBy() method, this returns a pyspark. sql. GroupedData object which contains agg(), sum(), count(), min(), max(), avg() e.t.c to perform aggregations.
The groupBy method is defined in the Dataset class. groupBy returns a RelationalGroupedDataset object where the agg() method is defined. Spark makes great use of object oriented programming! The RelationalGroupedDataset class also defines a sum() method that can be used to get the same result with less code.
Solution: We can apply the “groupByKey” / “reduceByKey” transformations on (key,val) pair RDD. The “groupByKey” will group the values for each key in the original RDD. It will create a new pair, where the original key corresponds to this collected group of values.
My guess is that you want to transpose the data according to multiple fields.
A simple way is to concatenate the target fields that you will group by, and make it a key in a paired RDD. For example:
lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1'])
rdd = lines.map(lambda x: x.split(',')).map(lambda x: (x[0] + ', ' + x[1], x[3])).reduceByKey(lambda a, b: a + ', ' + b)
print rdd.collect()
Then you will get the transposed result.
[('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')]
I grouped ((id1,t1),((p1,5.0),(p2,6.0)) and so on ... as my map function. Later, I reduce using map_group which creates an array for [p1,p2, . . . ] and fills in values in their respective positions.
def map_group(pgroup):
x = np.zeros(19)
x[0] = 1
value_list = pgroup[1]
for val in value_list:
fno = val[0].split('.')[0]
x[int(fno)-5] = val[1]
return x
tgbr = tfile.map(lambda d: ((d[0][0],d[0][2]),[(d[0][1],d[1])])) \
.reduceByKey(lambda p,q:p+q) \
.map(lambda d: (d[0], map_group(d)))
This does feel like an expensive solution in terms of computation. But works for now.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With