Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark two level aggregation

Tags:

apache-spark

My RDD contains three values [Group, User, Bytes]. My requirement is to aggregate on Bytes consumed by user and get top N users by total bytes per group.

For ex: with input data, as:

G1 U1 10
G1 U1 20
G1 U2 25
G1 U3 20
G2 U1 30
G2 U2 15
G2 U2 25
G2 U3 45

Query with top2 should return:

G1 U1 30
G1 U2 25
G2 U3 45
G2 U2 40

So far my code is as follows:

rdd: RDD[(String, String), Double)
rdd.reduceByKey((x,y) => (x+y))
    .map {
       x => ((x._1._1), (x._1._2, x._2))
    }.sortBy(x => x._2._2, false)

I am yet to figure out how to group by GROUP value and then only take topN results. Can anyone help further or if there is a better way of solving my requirement?

like image 351
Firdousi Farozan Avatar asked Dec 21 '25 08:12

Firdousi Farozan


1 Answers

From your questions it seems you are trying to get the rank(SQL) for each group.

So here is my solution for your question.It might not be the most efficient but it works

val rddsum = rdd.reduceByKey((x,y) => (x+y)).map(x => (x._1._1,x._1._2,x._2))

gives result as before,

(G1, U1, 30)
(G1, U2, 25)
(G1, U3, 20)
(G2, U1, 30)
(G2, U2, 40)
(G2, U3, 45)

now, groupby 1st col and mapValues with rank

val grpd = rddsum.groupBy{x => x._1}

val sortAndRankedItems = grpd.mapValues{ it => it.toList.sortBy{x => x._3}.zip(Stream from 1) } 

Now sortAndRankedItems will be of type Array[(String, List[((String, String, String), Int)])] Hence take only second element which is of interest by flatmapping, filter topN elements in this case it is 2, then consider 1st element only i,e tuple to arrive at the answer.

val result = sortAndRankedItems.flatMap{case(m,n) => n}.filter{x => x._2 <= 2}.map{case(x,y) => x}

Hope it helps!!

like image 96
Akash Avatar answered Dec 24 '25 02:12

Akash



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!