My RDD contains three values [Group, User, Bytes]. My requirement is to aggregate on Bytes consumed by user and get top N users by total bytes per group.
For ex: with input data, as:
G1 U1 10
G1 U1 20
G1 U2 25
G1 U3 20
G2 U1 30
G2 U2 15
G2 U2 25
G2 U3 45
Query with top2 should return:
G1 U1 30
G1 U2 25
G2 U3 45
G2 U2 40
So far my code is as follows:
rdd: RDD[(String, String), Double)
rdd.reduceByKey((x,y) => (x+y))
.map {
x => ((x._1._1), (x._1._2, x._2))
}.sortBy(x => x._2._2, false)
I am yet to figure out how to group by GROUP value and then only take topN results. Can anyone help further or if there is a better way of solving my requirement?
From your questions it seems you are trying to get the rank(SQL) for each group.
So here is my solution for your question.It might not be the most efficient but it works
val rddsum = rdd.reduceByKey((x,y) => (x+y)).map(x => (x._1._1,x._1._2,x._2))
gives result as before,
(G1, U1, 30)
(G1, U2, 25)
(G1, U3, 20)
(G2, U1, 30)
(G2, U2, 40)
(G2, U3, 45)
now, groupby 1st col and mapValues with rank
val grpd = rddsum.groupBy{x => x._1}
val sortAndRankedItems = grpd.mapValues{ it => it.toList.sortBy{x => x._3}.zip(Stream from 1) }
Now sortAndRankedItems will be of type Array[(String, List[((String, String, String), Int)])] Hence take only second element which is of interest by flatmapping, filter topN elements in this case it is 2, then consider 1st element only i,e tuple to arrive at the answer.
val result = sortAndRankedItems.flatMap{case(m,n) => n}.filter{x => x._2 <= 2}.map{case(x,y) => x}
Hope it helps!!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With