When I use reduceByKey
or aggregateByKey
, I'm confronted with partition problems.
ex)reduceBykey(_+_).map(code)
Especially, if input data is skewed, the partitioning problem becomes even worse when using the above methods.
So, as a solution to this, I use repartition
method.
For example, http://dev.sortable.com/spark-repartition/ is similar.
This is good for partition distribution, but therepartition
is also expensive.
Is there a way to solve the partition problem wisely?
Spark reduceByKey Function In Spark, the reduceByKey function is a frequently used transformation operation that performs aggregation of data. It receives key-value pairs (K, V) as an input, aggregates the values based on the key and generates a dataset of (K, V) pairs as an output.
The resilient distributed dataframe(RDD) is defined from the list. Further, the dataframe is parallelized using spark and then using reducebykey() function; it is normalized. The output is displayed.
Both reduceByKey and groupByKey result in wide transformations which means both triggers a shuffle operation. The key difference between reduceByKey and groupByKey is that reduceByKey does a map side combine and groupByKey does not do a map side combine.
Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
You are right,
Repartition is really expensive to run. Due to shuffles and other minor steps. Creating an example as you example said like this:
rdd.map(x => (x, x * x)).repartition(8).reduceByKey(_+_)
See the DAG here:
This step will create at DAG, one map, one repartition and one reduce.
But if you use the repartition inside the reduceByKey
you can take a repartition for "free".
The main part of Repratition is the Shuffle, and the main part of reduceByKey is the shuffle too. You can see that in Scala lib, the reduceByKey
has a numPartition parameter.
So you can change your code for this:
rdd.map(x => (x, x * x)).reduceByKey(_+_, 8)
And you can see the same code with the repartition in the reduceByKey
it is much faster. Due to you have one less shuffle to do.
You have to distinguish between two different problems:
Data skew
If data distribution is highly skewed (let's assume the worst case scenario with only a single unique key) then by definition the output will be skewed and changing a partitioner cannot help you.
There are some techniques, which can be used to partially address the problem, but overall partitioning is not a core issue here.
Partitioner bias
Poorly chosen partitioning function can result in a skewed data distribution even if data is uniformly distributed. For example:
val rdd = sc.parallelize(Seq((5, None), (10, None), (15, None), (20, None)), 5)
rdd
.partitionBy(new org.apache.spark.HashPartitioner(5))
.glom.map(_.size).collect
Array[Int] = Array(4, 0, 0, 0, 0)
As you can see despite the fact that key distribution is not skewed, skewed has been induced by the data regularities and poor properties of the hashCode
.
In case like this choosing different Partitioner
:
rdd
.partitionBy(new org.apache.spark.RangePartitioner(5, rdd))
.glom.map(_.size).collect
Array[Int] = Array(1, 1, 1, 1, 0)
or adjusting properties of the existing one:
rdd
.partitionBy(new org.apache.spark.HashPartitioner(7))
.glom.map(_.size).collect
Array[Int] = Array(0, 1, 0, 1, 0, 1, 1)
can resolve the issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With