Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct?

I have an RDD that is too large to consistently perform a distinct statement without spurious errors (e.g. SparkException stage failed 4 times, ExecutorLostFailure, HDFS Filesystem closed, Max number of executor failures reached, Stage cancelled because SparkContext was shut down, etc.)

I am trying to count distinct IDs in a particular column, for example:

print(myRDD.map(a => a._2._1._2).distinct.count())

is there an easy, consistent, less-shuffle-intensive way to do the command above, possibly using mapPartitions, reduceByKey, flatMap, or other commands that use fewer shuffles than distinct?

See also What are the Spark transformations that causes a Shuffle?

like image 716
Glenn Strycker Avatar asked Jun 26 '15 21:06

Glenn Strycker


People also ask

How do I use groupBy in RDD?

For example, suppose I want to group each word of rdd3 based on first 3 characters. Solution: The “groupBy” transformation will group the data in the original RDD. It creates a set of key value pairs, where the key is output of a user function, and the value is all items for which the function yields this key.

Can data in RDD be changed once RDD is created?

RDDs are immutable (read-only) in nature. You cannot change an original RDD, but you can create new RDDs by performing coarse-grain operations, like transformations, on an existing RDD.

What does distinct do in Spark?

The distinct() methodReturns a new DataFrame containing the distinct rows in this DataFrame . Now if you need to consider only a subset of the columns when dropping duplicates, then you first have to make a column selection before calling distinct() as shown below.

What is a change in Spark RDD?

RDD Transformation. Spark Transformation is a function that produces new RDD from the existing RDDs. It takes RDD as input and produces one or more RDD as output. Each time it creates new RDD when we apply any transformation. Thus, the so input RDDs, cannot be changed since RDD are immutable in nature.


2 Answers

It might be better to figure out if there is another underlying issue, but the below will do what you want...rather round about way to do it, but it sounds like it will fit your bill:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(Set[YourType]())((agg, value) => agg + value, (agg1, agg2) => agg1 ++ agg2) 
  .keys
  .count

Or even this seems to work, but it isn't associative and commutative. It works due to how the internals of Spark works...but I might be missing a case...so while simpler, I'm not sure I trust it:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(YourTypeDefault)((x,y)=>y, (x,y)=>x)
  .keys.count
like image 127
Justin Pihony Avatar answered Sep 28 '22 17:09

Justin Pihony


As I see it there are 2 possible solutions for this matter:

  1. With a reduceByKey
  2. With a mapPartitions

Let's see both of them with an example.

I have a dataset of 100.000 movie ratings with the format (idUser, (idMovie, rating)). Let's say we would like to know how many different users have rated a movie:

Lets first take a look using distinct:

val numUsers = rddSplitted.keys.distinct()
println(s"numUsers is ${numUsers.count()}")
println("*******toDebugString of rddSplitted.keys.distinct*******")
println(numUsers.toDebugString)

We will get the following results:

numUsers is 943

*******toDebugString of rddSplitted.keys.distinct*******
(2) MapPartitionsRDD[6] at distinct at MovieSimilaritiesRicImproved.scala:98 []
 |  ShuffledRDD[5] at distinct at MovieSimilaritiesRicImproved.scala:98 []
 +-(2) MapPartitionsRDD[4] at distinct at MovieSimilaritiesRicImproved.scala:98 []
    |  MapPartitionsRDD[3] at keys at MovieSimilaritiesRicImproved.scala:98 []
    |  MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
    |  C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
    |  C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []

With the toDebugString function, we can analyze in a better way what is happening with our RDD's.

Now, let's use reduceByKey, for instance, counting how many times each user has voted and at the same time obtaining the number of different users:

val numUsers2 = rddSplitted.map(x => (x._1, 1)).reduceByKey({case (a, b) => a })
println(s"numUsers is ${numUsers2.count()}")
println("*******toDebugString of rddSplitted.map(x => (x._1, 1)).reduceByKey(_+_)*******")
println(numUsers2.toDebugString)

We will get now these results:

numUsers is 943

*******toDebugString of rddSplitted.map(x => (x._1, 1)).reduceByKey(_+_)*******
(2) ShuffledRDD[4] at reduceByKey at MovieSimilaritiesRicImproved.scala:104 []
 +-(2) MapPartitionsRDD[3] at map at MovieSimilaritiesRicImproved.scala:104 []
    |  MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
    |  C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
    |  C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []

Analyzing the RDD's produced, we can see that reduceByKey performs the same action in a more efficient way than the distinct before.

Finally, let's use mapPartitions. The main goal is to try to distinct first the users in each partition of our dataset, and then obtain the final different users.

val a1 = rddSplitted.map(x => (x._1))
println(s"Number of elements in a1: ${a1.count}")
val a2 = a1.mapPartitions(x => x.toList.distinct.toIterator)
println(s"Number of elements in a2: ${a2.count}")
val a3 = a2.distinct()
println("There are "+ a3.count()+" different users")
println("*******toDebugString of map(x => (x._1)).mapPartitions(x => x.toList.distinct.toIterator).distinct *******")
println(a3.toDebugString)

We will get the following:

Number of elements in a1: 100000
Number of elements in a2: 1709
There are 943 different users

*******toDebugString of map(x => (x._1)).mapPartitions(x => x.toList.distinct.toIterator).distinct *******
(2) MapPartitionsRDD[7] at distinct at MovieSimilaritiesRicImproved.scala:124 []
 |  ShuffledRDD[6] at distinct at MovieSimilaritiesRicImproved.scala:124 []
 +-(2) MapPartitionsRDD[5] at distinct at MovieSimilaritiesRicImproved.scala:124 []
    |  MapPartitionsRDD[4] at mapPartitions at MovieSimilaritiesRicImproved.scala:122 []
    |  MapPartitionsRDD[3] at map at MovieSimilaritiesRicImproved.scala:120 []
    |  MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
    |  C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
    |  C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []

We can see now that mapPartition first gets the distinct number of user in each partition of the dataset, shorting the number of instances from 100,000 to 1,709 without performing any shuffle. Then, with this much lower amount of data, a distinct over the whole RDD can be carried out without worrying for the shuffle and getting the result much faster.

I would recommend using this last proposal with mapPartitions rather than the reduceByKey, as it manages a lower amount of data. Another solution could be using both functions, first mapPartitions as mentioned before and then instead of distinct, using the reduceByKey in the same way as also mentioned before.

like image 21
Ricardo León Avatar answered Sep 28 '22 17:09

Ricardo León