Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do `map` and `reduce` methods work in Spark RDDs?

Following code is from the quick start guide of Apache Spark. Can somebody explain me what is the "line" variable and where it comes from?

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

Also, how does a value get passed into a,b?

Link to the QSG http://spark.apache.org/docs/latest/quick-start.html

like image 854
DesirePRG Avatar asked Sep 11 '15 08:09

DesirePRG


People also ask

What are map and reduce functions in Spark?

Difference Between Spark & MapReduceSpark stores data in-memory whereas MapReduce stores data on disk. Hadoop uses replication to achieve fault tolerance whereas Spark uses different data storage model, resilient distributed datasets (RDD), uses a clever way of guaranteeing fault tolerance that minimizes network I/O.

How does map work in Spark?

Spark Map function takes one element as input process it according to custom code (specified by the developer) and returns one element at a time. Map transforms an RDD of length N into another RDD of length N. The input and output RDDs will typically have the same number of records.

How does reduce work in Spark?

RDD reduce() function takes function type as an argument and returns the RDD with the same type as input. It reduces the elements of the input RDD using the binary operator specified.

Is reduce an action in RDD?

On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).


2 Answers

First, according to your link, the textfile is created as

val textFile = sc.textFile("README.md")

such that textfile is a RDD[String] meaning it is a resilient distributed dataset of type String. The API to access is very similar to that of regular Scala collections.

So now what does this map do?

Imagine you have a list of Strings and want to convert that into a list of Ints, representing the length of each String.

val stringlist: List[String] = List("ab", "cde", "f")
val intlist: List[Int] = stringlist.map( x => x.length )

The map method expects a function. A function, that goes from String => Int. With that function, each element of the list is transformed. So the value of intlist is List( 2, 3, 1 )

Here, we have created an anonymous function from String => Int. That is x => x.length. One can even write the function more explicit as

stringlist.map( (x: String) => x.length )  

If you do use write the above explicit, you can

val stringLength : (String => Int) = {
  x => x.length
}
val intlist = stringlist.map( stringLength )

So, here it is absolutely evident, that stringLength is a function from String to Int.

Remark: In general, map is what makes up a so called Functor. While you provide a function from A => B, map of the functor (here List) allows you use that function also to go from List[A] => List[B]. This is called lifting.

Answers to your questions

What is the "line" variable?

As mentioned above, line is the input parameter of the function line => line.split(" ").size

More explicit (line: String) => line.split(" ").size

Example: If line is "hello world", the function returns 2.

"hello world" 
=> Array("hello", "world")  // split 
=> 2                        // size of Array

How does a value get passed into a,b?

reduce also expects a function from (A, A) => A, where A is the type of your RDD. Lets call this function op.

What does reduce. Example:

List( 1, 2, 3, 4 ).reduce( (x,y) => x + y )
Step 1 : op( 1, 2 ) will be the first evaluation. 
  Start with 1, 2, that is 
    x is 1  and  y is 2
Step 2:  op( op( 1, 2 ), 3 ) - take the next element 3
  Take the next element 3: 
    x is op(1,2) = 3   and y = 3
Step 3:  op( op( op( 1, 2 ), 3 ), 4) 
  Take the next element 4: 
    x is op(op(1,2), 3 ) = op( 3,3 ) = 6    and y is 4

Result here is the sum of the list elements, 10.

Remark: In general reduce calculates

op( op( ... op(x_1, x_2) ..., x_{n-1}), x_n)

Full example

First, textfile is a RDD[String], say

TextFile
 "hello Tyth"
 "cool example, eh?"
 "goodbye"

TextFile.map(line => line.split(" ").size)
 2
 3
 1
TextFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
 3
   Steps here, recall `(a, b) => if (a > b) a else b)`
   - op( op(2, 3), 1) evaluates to op(3, 1), since op(2, 3) = 3 
   - op( 3, 1 ) = 3
like image 193
Martin Senne Avatar answered Nov 01 '22 05:11

Martin Senne


Map and reduce are methods of RDD class, which has interface similar to scala collections.

What you pass to methods map and reduce are actually anonymous function (with one param in map, and with two parameters in reduce). textFile calls provided function for every element (line of text in this context) it has.

Maybe you should read some scala collection introduction first.

You can read more about RDD class API here: https://spark.apache.org/docs/1.2.1/api/scala/#org.apache.spark.rdd.RDD

like image 7
Tyth Avatar answered Nov 01 '22 05:11

Tyth