Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is the fold action necessary in Spark?

I've a silly question involving fold and reduce in PySpark. I understand the difference between these two methods, but, if both need that the applied function is a commutative monoid, I cannot figure out an example in which fold cannot be substituted byreduce`.

Besides, in the PySpark implementation of fold it is used acc = op(obj, acc), why this operation order is used instead of acc = op(acc, obj)? (this second order sounds more closed to a leftFold to me)

Cheers

Tomas

like image 340
Tomas F. Pena Avatar asked Dec 30 '15 12:12

Tomas F. Pena


People also ask

Why fold left and fold right are not supported in Spark?

In order for Spark to become a leader in computational speed, it needed to incorporate operational parallelism. Parallelism will ultimately be the reason foldLeft is not found on the RDD class.

What is the difference between reduce and fold in Spark?

fold calls fold on an iterator for each partition, then merges the results, reduce calls reduceLeft on the iterator for each partition then merges the result. The difference is that fold doesn't need to worry about empty partitions or collections, because then it will just use the zero value.

Why reduce is action in Spark?

reduce is an action which Aggregate the elements of the dataset using a function func (which takes two arguments and returns one),also we can use reduce for single RDDs (for more info Please click HERE). Save this answer.

What is meant by actions in Spark?

Actions are RDD's operation, that value returns back to the spar driver programs, which kick off a job to execute on a cluster. Transformation's output is an input of Actions. reduce, collect, takeSample, take, first, saveAsTextfile, saveAsSequenceFile, countByKey, foreach are common actions in Apache spark.


1 Answers

Empty RDD

It cannot be substituted when RDD is empty:

val rdd = sc.emptyRDD[Int]
rdd.reduce(_ + _)
// java.lang.UnsupportedOperationException: empty collection at   
// org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...

rdd.fold(0)(_ + _)
// Int = 0

You can of course combine reduce with condition on isEmpty but it is rather ugly.

Mutable buffer

Another use case for fold is aggregation with mutable buffer. Consider following RDD:

import breeze.linalg.DenseVector

val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)

Lets say we want a sum of all elements. A naive solution is to simply reduce with +:

rdd.reduce(_ + _)

Unfortunately it creates a new vector for each element. Since object creation and subsequent garbage collection is expensive it could be better to use a mutable object. It is not possible with reduce (immutability of RDD doesn't imply immutability of the elements), but can be achieved with fold as follows:

rdd.fold(DenseVector(0))((acc, x) => acc += x)

Zero element is used here as mutable buffer initialized once per partition leaving actual data untouched.

acc = op(obj, acc), why this operation order is used instead of acc = op(acc, obj)

See SPARK-6416 and SPARK-7683

like image 85
zero323 Avatar answered Nov 15 '22 04:11

zero323