Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark - Multiple filters on RDD in one pass

I have an RDD of Map[String, String]; is there a way to call filter it multiple times without going through the RDD more than once?

For example, I want to do something like this:

val stateNY = mapRDD.filter(person => person("state").equals("NY"))
val stateOR = mapRDD.filter(person => person("state").equals("OR"))
val stateMA = mapRDD.filter(person => person("state").equals("MA"))
val stateWA = mapRDD.filter(person => person("state").equals("WA"))

and this:

val wage10to20 = mapRDD.filter(person => person("wage").toDouble > 10 && person("wage").toDouble <= 20)
val wage20to30 = mapRDD.filter(person => person("wage").toDouble > 20 && person("wage").toDouble <= 30)
val wage30to40 = mapRDD.filter(person => person("wage").toDouble > 30 && person("wage").toDouble <= 40)
val wage40to50 = mapRDD.filter(person => person("wage").toDouble > 40 && person("wage").toDouble <= 50)

where mapRDD is of type RDD[Map[String, String]], in one pass.

like image 441
Corey Wu Avatar asked Jul 06 '15 21:07

Corey Wu


2 Answers

I assume you mean you want to return separate RDDs for each value ( i.e. not simply do person => Set("NY", "OR", "MA", "WA").contains(person("state")) )

Typically what you are trying to achieve will be possible using Pair RDDs

In your first example, you could use:

val keyByState = mapRDD.keyBy(_("state"))

And then do operations such as groupByKey, reduceByKey, etc.

Or in your second example, key by the wage rounded down to the nearest 10.

like image 172
dpeacock Avatar answered Oct 21 '22 11:10

dpeacock


If you ultimately need them in separate RDDs, you would need the separate filters and the multiple scans at some point. You should cache the RDD (mapRDD in your first example) you're traversing to prevent it from being read multiple times.

There's an advantage to doing the filters as you wrote them vs. doing the grouping suggested in another answer since the filters can occur in map side whereas filtering after grouping will requiring shuffling all the data around (including data related to states you don't need ...)

like image 32
Arnon Rotem-Gal-Oz Avatar answered Oct 21 '22 11:10

Arnon Rotem-Gal-Oz