Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark group multiple rdd items by key

I have rdd items like:

(3922774869,10,1)
(3922774869,11,1)
(3922774869,12,2)
(3922774869,13,2)
(1779744180,10,1)
(1779744180,11,1)
(3922774869,14,3)
(3922774869,15,2)
(1779744180,16,1)
(3922774869,12,1)
(3922774869,13,1)
(1779744180,14,1)
(1779744180,15,1)
(1779744180,16,1)
(3922774869,14,2)
(3922774869,15,1)
(1779744180,16,1)
(1779744180,17,1)
(3922774869,16,4)
...

which represent (id, age, count) and I want to group those lines to generate a dataset for which each line represent the distribution of age of each id like this((id, age) is uniq):

(1779744180, (10,1), (11,1), (12,2), (13,2) ...)
(3922774869, (10,1), (11,1), (12,3), (13,4) ...)

which is (id, (age, count), (age, count) ...)

Could some one give me a clue?

like image 224
armnotstrong Avatar asked Apr 06 '16 09:04

armnotstrong


2 Answers

You can first reduce by both fields, then use groupBy:

rdd
  .map { case (id, age, count) => ((id, age), count) }.reduceByKey(_ + _)
  .map { case ((id, age), count) => (id, (age, count)) }.groupByKey()

Which returns an RDD[(Long, Iterable[(Int, Int)])], for the input above it would contain these two records:

(1779744180,CompactBuffer((16,3), (15,1), (14,1), (11,1), (10,1), (17,1)))
(3922774869,CompactBuffer((11,1), (12,3), (16,4), (13,3), (15,3), (10,1), (14,5)))
like image 98
Tzach Zohar Avatar answered Sep 19 '22 15:09

Tzach Zohar


As already suggested by Tzach Zohar, you could first of all reshape your RDD to fit into a Key/Value RDD. If you have a very large dataset, I would advise you not to use groupByKey, in order to reduce shuffling, though it seems super easy. For example, basing this solution on the one already posted:

import scala.collection.mutable

val rddById = rdd.map { case (id, age, count) => ((id, age), count) }.reduceByKey(_ + _)
val initialSet = mutable.HashSet.empty[(Int, Int)]
val addToSet = (s: mutable.HashSet[(Int, Int)], v: (Int, Int)) => s += v
val mergePartitionSets = (p1: mutable.HashSet[(Int, Int)], p2: mutable.HashSet[(Int, Int)]) => p1 ++= p2
val uniqueByKey = rddById.aggregateByKey(initialSet)(addToSet, mergePartitionSets)

This will result in

uniqueByKey: org.apache.spark.rdd.RDD[(AnyVal, scala.collection.mutable.HashSet[(Int, Int)])]

And you will be able to print the values as:

scala> uniqueByKey.foreach(println)
(1779744180,Set((15,1), (16,3)))
(1779744180,Set((14,1), (11,1), (10,1), (17,1)))
(3922774869,Set((12,3), (11,1), (10,1), (14,5), (16,4), (15,3), (13,3)))

Shuffling can be a great bottleneck. Having many big HashSet's (according to your dataset) could also be a problem. However, it's more likely that you'll have a large amount of ram (e.g., 64GB of RAM?) than network latency (and all the problems that shuffling brings along) which results in faster reads/writes across distributed machines.

To read more about aggregateByKey, have a look at this blog post.

like image 41
Markon Avatar answered Sep 19 '22 15:09

Markon