Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark + Scala transformations, immutability & memory consumption overheads

I have gone through some videos in Youtube regarding Spark architecture.

Even though Lazy evaluation, Resilience of data creation in case of failures, good functional programming concepts are reasons for success of Resilenace Distributed Datasets, one worrying factor is memory overhead due to multiple transformations resulting into memory overheads due data immutability.

If I understand the concept correctly, Every transformations is creating new data sets and hence the memory requirements will gone by those many times. If I use 10 transformations in my code, 10 sets of data sets will be created and my memory consumption will increase by 10 folds.

e.g.

val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

Above example has three transformations : flatMap, map and reduceByKey. Does it implies I need 3X memory of data for X size of data?

Is my understanding correct? Is caching RDD is only solution to address this issue?

Once I start caching, it may spill over to disk due to large size and performance would be impacted due to disk IO operations. In that case, performance of Hadoop and Spark are comparable?

EDIT:

From the answer and comments, I have understood lazy initialization and pipeline process. My assumption of 3 X memory where X is initial RDD size is not accurate.

But is it possible to cache 1 X RDD in memory and update it over the pipleline? How does cache () works?

like image 513
Ravindra babu Avatar asked Feb 02 '16 06:02

Ravindra babu


2 Answers

First off, the lazy execution means that functional composition can occur:

scala> val rdd = sc.makeRDD(List("This is a test", "This is another test", 
                                 "And yet another test"), 1)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[70] at makeRDD at <console>:27

scala> val counts = rdd.flatMap(line => {println(line);line.split(" ")}).
     | map(word => {println(word);(word,1)}).
     | reduceByKey((x,y) => {println(s"$x+$y");x+y}).
     | collect
This is a test
This
is
a
test
This is another test
This
1+1
is
1+1
another
test
1+1
And yet another test
And
yet
another
1+1
test
2+1
counts: Array[(String, Int)] = Array((And,1), (is,2), (another,2), (a,1), (This,2), (yet,1), (test,3))

First note that I force the parallelism down to 1 so that we can see how this looks on a single worker. Then I add a println to each of the transformations so that we can see how the workflow moves. You see that it processes the line, then it processes the output of that line, followed by the reduction. So, there are not separate states stored for each transformation as you suggested. Instead, each piece of data is looped through the entire transformation up until a shuffle is needed, as can be seen by the DAG visualization from the UI:

DAG

That is the win from the laziness. As to Spark v Hadoop, there is already a lot out there (just google it), but the gist is that Spark tends to utilize network bandwidth out of the box, giving it a boost right there. Then, there a number of performance improvements gained by laziness, especially if a schema is known and you can utilize the DataFrames API.

So, overall, Spark beats MR hands down in just about every regard.

like image 117
Justin Pihony Avatar answered Oct 21 '22 01:10

Justin Pihony


The memory requirements of Spark not 10 times if you have 10 transformations in your Spark job. When you specify the steps of transformations in a job Spark builds a DAG which will allow it to execute all the steps in the jobs. After that it breaks the job down into stages. A stage is a sequence of transformations which Spark can execute on dataset without shuffling.

When an action is triggered on the RDD, Spark evaluates the DAG. It just applies all the transformations in a stage together until it hits the end of the stage, so it is unlikely for the memory pressure to be 10 time unless each transformation leads to a shuffle (in which case it is probably a badly written job).

I would recommend watching this talk and going through the slides.

like image 23
Saket Avatar answered Oct 21 '22 00:10

Saket