I've a silly question involving fold
and reduce in PySpark
. I understand the difference between these two methods, but, if both need that the applied function is a commutative monoid, I cannot figure out an example in which fold cannot be substituted by
reduce`.
Besides, in the PySpark implementation of fold
it is used acc = op(obj, acc)
, why this operation order is used instead of acc = op(acc, obj)
? (this second order sounds more closed to a leftFold
to me)
Cheers
Tomas
In order for Spark to become a leader in computational speed, it needed to incorporate operational parallelism. Parallelism will ultimately be the reason foldLeft is not found on the RDD class.
fold calls fold on an iterator for each partition, then merges the results, reduce calls reduceLeft on the iterator for each partition then merges the result. The difference is that fold doesn't need to worry about empty partitions or collections, because then it will just use the zero value.
reduce is an action which Aggregate the elements of the dataset using a function func (which takes two arguments and returns one),also we can use reduce for single RDDs (for more info Please click HERE). Save this answer.
Actions are RDD's operation, that value returns back to the spar driver programs, which kick off a job to execute on a cluster. Transformation's output is an input of Actions. reduce, collect, takeSample, take, first, saveAsTextfile, saveAsSequenceFile, countByKey, foreach are common actions in Apache spark.
Empty RDD
It cannot be substituted when RDD
is empty:
val rdd = sc.emptyRDD[Int]
rdd.reduce(_ + _)
// java.lang.UnsupportedOperationException: empty collection at
// org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...
rdd.fold(0)(_ + _)
// Int = 0
You can of course combine reduce
with condition on isEmpty
but it is rather ugly.
Mutable buffer
Another use case for fold is aggregation with mutable buffer. Consider following RDD:
import breeze.linalg.DenseVector
val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)
Lets say we want a sum of all elements. A naive solution is to simply reduce with +
:
rdd.reduce(_ + _)
Unfortunately it creates a new vector for each element. Since object creation and subsequent garbage collection is expensive it could be better to use a mutable object. It is not possible with reduce
(immutability of RDD doesn't imply immutability of the elements), but can be achieved with fold
as follows:
rdd.fold(DenseVector(0))((acc, x) => acc += x)
Zero element is used here as mutable buffer initialized once per partition leaving actual data untouched.
acc = op(obj, acc), why this operation order is used instead of acc = op(acc, obj)
See SPARK-6416 and SPARK-7683
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With