In Spark's documentation, it says that RDDs method reduce
requires a associative AND commutative binary function.
However, the method reduceByKey
ONLY requires an associative binary function.
sc.textFile("file4kB", 4)
I did some tests, and apparently it's the behavior I get. Why this difference? Why does reduceByKey
ensure the binary function is always applied in certain order (to accommodate for the lack of commutativity) when reduce
does not?
Example, if a load some (small) text with 4 partitions (minimum):
val r = sc.textFile("file4k", 4)
then:
r.reduce(_ + _)
returns a string where parts are not always in the same order, whereas:
r.map(x => (1,x)).reduceByKey(_ + _).first
always returns the same string (where everything is in the same order than in the original file).
(I checked with r.glom
and the file content is indeed spread over 4 partitions, there is no empty partition).
Basically, reduce must pull the entire dataset down into a single location because it is reducing to one final value. reduceByKey on the other hand is one value for each key. And since this action can be run on each machine locally first then it can remain an RDD and have further transformations done on its dataset.
The groupByKey can cause out of disk problems as data is sent over the network and collected on the reduced workers. You can see the below example. Whereas in reducebykey, Data are combined at each partition, only one output for one key at each partition to send over the network.
Spark reduceByKey Function In Spark, the reduceByKey function is a frequently used transformation operation that performs aggregation of data. It receives key-value pairs (K, V) as an input, aggregates the values based on the key and generates a dataset of (K, V) pairs as an output.
Some initial ETL operations may be required to get your data into a (key, value) form, but with pair RDDs you may perform any desired aggregation over a set of values. Spark supports several powerful reduction transformations and actions. The most important reduction transformations are: reduceByKey()
As far as I am concerned this is an error in the documentation and results you see are simply incidental. Practice, other resources and a simple analysis of the code show that function passed to reduceByKey
should be not only associative but commutative as well.
practice - while it looks like the order is preserved in a local mode it is no longer true when you run Spark on a cluster, including standalone mode.
other resources - to quote Data Exploration Using Spark from AmpCamp 3:
There is a convenient method called reduceByKey in Spark for exactly this pattern. Note that the second argument to reduceByKey determines the number of reducers to use. By default, Spark assumes that the reduce function is commutative and associative and applies combiners on the mapper side.
code - reduceByKey
is implemented using combineByKeyWithClassTag
and creates ShuffledRDD
. Since Spark doesn't guarantee the order after shuffling the only way to restore it would be to attach some metadata to the partially reduced records. As far as I can tell nothing like this takes place.
On a side note reduce
as it is implemented in PySpark will work just fine with a function which is only commutative. It is of course just a detail of an implementation and not a part of the contract.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With