Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to interpret RDD.treeAggregate

I ran into this line in the Apache Spark code source

val (gradientSum, lossSum, miniBatchSize) = data
    .sample(false, miniBatchFraction, 42 + i)
    .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
      seqOp = (c, v) => {
        // c: (grad, loss, count), v: (label, features)
        val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
        (c._1, c._2 + l, c._3 + 1)
      },
      combOp = (c1, c2) => {
        // c: (grad, loss, count)
        (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
      }
    )

I have multiple trouble reading this :

  • First I can't find anything on the web that explains exactly how treeAggregate works, what are the meaning of the params.
  • Second, here .treeAggregate seems to have two ()() following the method name. What could that mean? Is that some special scala syntax that I don't understand.
  • Finally, I see both seqOp and comboOp return a 3 element tuple which match the expected left hand side variable, but which one actually gets returned?

This statement must be really advanced. I can't begin to decipher this.

like image 434
bhomass Avatar asked Apr 25 '15 03:04

bhomass


People also ask

How does RDD aggregate data?

aggregate() function Form first one, we combine the element from our RDD along with the accumulator, and from the second one, it combines the accumulator. Thus, we supply the initial zero value of the type which we want to return, in aggregate. So we get 21 + 22 + 31 + (4 * 3) = 86.

What is RDD explain RDD operations in detail?

RDD was the primary user-facing API in Spark since its inception. At the core, an RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.

What is Tree reduce in Spark?

Reduces the elements of this RDD in a multi-level tree pattern.


1 Answers

treeAggregate is a specialized implementation of aggregate that iteratively applies the combine function to a subset of partitions. This is done in order to prevent returning all partial results to the driver where a single pass reduce would take place as the classic aggregate does.

For all practical purposes, treeAggregate follows the same principle as aggregate explained in this answer: Explain the aggregate functionality in Python with the exception that it takes an extra parameter to indicate the depth of the partial aggregation level.

Let me try to explain what's going on here specifically:

For aggregate, we need a zero, a combiner function and a reduce function. aggregate uses currying to specify the zero value independently of the combine and reduce functions.

We can then dissect the above function like this . Hopefully that helps understanding:

val Zero: (BDV, Double, Long) = (BDV.zeros[Double](n), 0.0, 0L)
val combinerFunction: ((BDV, Double, Long), (??, ??)) => (BDV, Double, Long)  =  (c, v) => {
        // c: (grad, loss, count), v: (label, features)
        val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
        (c._1, c._2 + l, c._3 + 1)
val reducerFunction: ((BDV, Double, Long),(BDV, Double, Long)) => (BDV, Double, Long) = (c1, c2) => {
        // c: (grad, loss, count)
        (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
      }

Then we can rewrite the call to treeAggregate in a more digestable form:

val (gradientSum, lossSum, miniBatchSize) = treeAggregate(Zero)(combinerFunction, reducerFunction)

This form will 'extract' the resulting tuple into the named values gradientSum, lossSum, miniBatchSize for further usage.

Note that treeAggregate takes an additional parameter depth which is declared with a default value depth = 2, thus, as it's not provided in this particular call, it will take that default value.

like image 146
maasg Avatar answered Oct 15 '22 19:10

maasg