Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What are the Spark transformations that causes a Shuffle?

I have trouble to find in the Spark documentation operations that causes a shuffle and operation that does not. In this list, which ones does cause a shuffle and which ones does not?

Map and filter does not. However, I am not sure with the others.

map(func) filter(func) flatMap(func) mapPartitions(func) mapPartitionsWithIndex(func) sample(withReplacement, fraction, seed) union(otherDataset) intersection(otherDataset) distinct([numTasks])) groupByKey([numTasks]) reduceByKey(func, [numTasks]) aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) sortByKey([ascending], [numTasks]) join(otherDataset, [numTasks]) cogroup(otherDataset, [numTasks]) cartesian(otherDataset) pipe(command, [envVars]) coalesce(numPartitions) 
like image 937
poiuytrez Avatar asked Oct 09 '14 08:10

poiuytrez


People also ask

What is shuffling in Spark?

Shuffling is the process of exchanging data between partitions. As a result, data rows can move between worker nodes when their source partition and the target partition reside on a different machine. Spark doesn't move data between nodes randomly.

Does distinct cause shuffle?

Here, distinct creates a shuffle. And it is very important to find out this way rather than docs because many times there will be situations where a shuffle will be required or not required for a certain function.

Does coalesce cause shuffle?

Coalesce does not involve a shuffle.

How You Can Avoid shuffle in Spark?

One way to avoid shuffles when joining two datasets is to take advantage of broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor.


1 Answers

It is actually extremely easy to find this out, without the documentation. For any of these functions just create an RDD and call to debug string, here is one example you can do the rest on ur own.

scala> val a  = sc.parallelize(Array(1,2,3)).distinct scala> a.toDebugString MappedRDD[5] at distinct at <console>:12 (1 partitions)   MapPartitionsRDD[4] at distinct at <console>:12 (1 partitions)     **ShuffledRDD[3] at distinct at <console>:12 (1 partitions)**       MapPartitionsRDD[2] at distinct at <console>:12 (1 partitions)         MappedRDD[1] at distinct at <console>:12 (1 partitions)           ParallelCollectionRDD[0] at parallelize at <console>:12 (1 partitions) 

So as you can see distinct creates a shuffle. It is also particularly important to find out this way rather than docs because there are situations where a shuffle will be required or not required for a certain function. For example join usually requires a shuffle but if you join two RDD's that branch from the same RDD spark can sometimes elide the shuffle.

like image 51
aaronman Avatar answered Sep 22 '22 19:09

aaronman