Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there an effective partitioning method when using reduceByKey in Spark?

When I use reduceByKey or aggregateByKey, I'm confronted with partition problems.

ex)reduceBykey(_+_).map(code)

Especially, if input data is skewed, the partitioning problem becomes even worse when using the above methods.

So, as a solution to this, I use repartition method.

For example, http://dev.sortable.com/spark-repartition/ is similar.

This is good for partition distribution, but therepartition is also expensive.

Is there a way to solve the partition problem wisely?

like image 592
S.Kang Avatar asked Mar 26 '17 09:03

S.Kang


People also ask

How does reduceByKey work in Spark?

Spark reduceByKey Function In Spark, the reduceByKey function is a frequently used transformation operation that performs aggregation of data. It receives key-value pairs (K, V) as an input, aggregates the values based on the key and generates a dataset of (K, V) pairs as an output.

Can we use reduceByKey in Spark Dataframe?

The resilient distributed dataframe(RDD) is defined from the list. Further, the dataframe is parallelized using spark and then using reducebykey() function; it is normalized. The output is displayed.

What is the difference between reduceByKey and groupByKey?

Both reduceByKey and groupByKey result in wide transformations which means both triggers a shuffle operation. The key difference between reduceByKey and groupByKey is that reduceByKey does a map side combine and groupByKey does not do a map side combine.

How does reduceByKey work in Pyspark?

Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.


2 Answers

You are right,

Repartition is really expensive to run. Due to shuffles and other minor steps. Creating an example as you example said like this:

rdd.map(x => (x, x * x)).repartition(8).reduceByKey(_+_)

See the DAG here:

enter image description here

This step will create at DAG, one map, one repartition and one reduce.

But if you use the repartition inside the reduceByKey you can take a repartition for "free".

The main part of Repratition is the Shuffle, and the main part of reduceByKey is the shuffle too. You can see that in Scala lib, the reduceByKey has a numPartition parameter.

So you can change your code for this:

rdd.map(x => (x, x * x)).reduceByKey(_+_, 8)

enter image description here

And you can see the same code with the repartition in the reduceByKey it is much faster. Due to you have one less shuffle to do.

enter image description here

like image 191
Thiago Baldim Avatar answered Nov 15 '22 12:11

Thiago Baldim


You have to distinguish between two different problems:

Data skew

If data distribution is highly skewed (let's assume the worst case scenario with only a single unique key) then by definition the output will be skewed and changing a partitioner cannot help you.

There are some techniques, which can be used to partially address the problem, but overall partitioning is not a core issue here.

Partitioner bias

Poorly chosen partitioning function can result in a skewed data distribution even if data is uniformly distributed. For example:

val rdd = sc.parallelize(Seq((5, None), (10, None), (15, None), (20, None)), 5)
rdd
  .partitionBy(new org.apache.spark.HashPartitioner(5))
  .glom.map(_.size).collect
Array[Int] = Array(4, 0, 0, 0, 0)

As you can see despite the fact that key distribution is not skewed, skewed has been induced by the data regularities and poor properties of the hashCode.

In case like this choosing different Partitioner:

rdd
  .partitionBy(new org.apache.spark.RangePartitioner(5, rdd))
  .glom.map(_.size).collect
Array[Int] = Array(1, 1, 1, 1, 0)

or adjusting properties of the existing one:

rdd
  .partitionBy(new org.apache.spark.HashPartitioner(7))
  .glom.map(_.size).collect
Array[Int] = Array(0, 1, 0, 1, 0, 1, 1)

can resolve the issue.

like image 45
zero323 Avatar answered Nov 15 '22 12:11

zero323