Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why are aggregate and fold two different APIs in Spark?

When using the Scala standard lib, I can do somthing like this:

scala> val scalaList = List(1,2,3)
scalaList: List[Int] = List(1, 2, 3)

scala> scalaList.foldLeft(0)((acc,n)=>acc+n)
res0: Int = 6

Making one Int out of many Ints.

And I can do something like this:

scala> scalaList.foldLeft("")((acc,n)=>acc+n.toString)
res1: String = 123

Making one String out of many Ints.

So, foldLeft could be either homogeneous or heterogeneous, whichever we want, it's in one API.

While in Spark, if I want one Int out of many Ints, I can do this:

scala> val rdd = sc.parallelize(List(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:12
scala> rdd.fold(0)((acc,n)=>acc+n)
res1: Int = 6

The fold API is similar to foldLeft, but it is only homogeneous, a RDD[Int] can only produce Int with fold.

There is a aggregate API in spark too:

scala> rdd.aggregate("")((acc,n)=>acc+n.toString, (s1,s2)=>s1+s2)
res11: String = 132

It is heterogeneous, a RDD[Int] can produce a String now.

So, why are fold and aggregate implemented as two different APIs in Spark?

Why are they not designed like foldLeft that could be both homogeneous and heterogeneous?

(I am very new to Spark, please excuse me if this is a silly question.)

like image 479
Cui Pengfei 崔鹏飞 Avatar asked Oct 29 '14 15:10

Cui Pengfei 崔鹏飞


2 Answers

fold can be implemented more efficiently because it doesn't depend on a fixed order of evaluation. So each cluster node can fold its own chunk in parallel, and then one small overall fold at the end. Whereas with foldLeft each element has to be folded in in order and nothing can be done in parallel.

(Also it's nice to have a simpler API for the common case for convenience. The standard lib has reduce as well as foldLeft for this reason)

like image 109
lmm Avatar answered Oct 19 '22 01:10

lmm


Specifically in Spark, the computation is distributed and done in parallel, so foldLeft can't be implemented as it is in the standard library. Instead, the aggregate requires two functions, one that performs an operation similar to fold on each element of type T, producing a value of type U, and another that combines the U from each partition into the final value:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
like image 36
Dan Osipov Avatar answered Oct 18 '22 23:10

Dan Osipov