Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark RDD- map vs mapPartitions

I read through theoretical differences between map and mapPartitions, & 'm much clear when to use them in varied situations.

But my problem described below is more based upon GC activity & Memory (RAM). Please read below for the problem:-

=> I wrote a map function to convert Row to String. So, an input of RDD[org.apache.spark.sql.Row] would be mapped to RDD[String]. But with this approach map object would be created for every row of an RDD. Thus creation of such large number of objects may increase GC activity.

=> To resolve above, I thought of using mapPartitions. So, that number of objects become equivalent to number of partitions. mapPartitions gives Iterator as an input and accepts to return and java.lang.Iterable. But most of the Iterable like Array, List, etc are in memory. So, if I have huge amount of data then would creating a Iterable this way can lead to out of Memory ? or Is there any other collection (java or scala) that should be utilized here (to spill to Disk in case memory starts to fill)? or should we only use mapPartitions in case RDD is completely in Memory?

Thanks in advance. Any help would be greatly appreciated.

like image 224
dinesh028 Avatar asked Dec 01 '16 12:12

dinesh028


People also ask

What is the difference between map and mapPartitions in spark?

mapPartitions() – This is precisely the same as map(); the difference being, Spark mapPartitions() provides a facility to do heavy initializations (for example, Database connection) once for each partition instead of doing it on every DataFrame row.

What is RDD map?

RDD map() transformation is used to apply any complex operations like adding a column, updating a column, transforming the data e.t.c, the output of map transformations would always have the same number of records as input.

What is the difference between map and filter in spark?

map(func):Return a new distributed dataset formed by passing each element of the source through a function func. filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.

What does RDD map return?

map :It returns a new RDD by applying a function to each element of the RDD. Function in map can return only one item. flatMap: Similar to map, it returns a new RDD by applying a function to each element of the RDD, but output is flattened.


2 Answers

If you think about JavaRDD.mapPartitions it takes FlatMapFunction (or some variant like DoubleFlatMapFunction) which is expected to return Iterator not Iterable. If underlaying collection is lazy then you have nothing to worry about.

RDD.mapPartitions takes a functions from Iterator to Iterator.

I general if you use reference data you can replace mapPartitions with map and use static member to store data. This will have the same footprint and will be easier to write.

like image 180
user7236328 Avatar answered Oct 13 '22 06:10

user7236328


to answer your question about mapPartition(f: Iterator => Iterator). it is lazy and also does not hold the whole partition in mem. Spark will use this(we can consider it to be a Functor in FP term) Iterator => Iterator function and recompile it into its own code to execute. if partition is too big, it will spill to disk before next shuffle point. so don't worry about it.

one thing that needs to mention is, you can force your function to materialize data into mem, simply by doing:

rdd.mapPartition(
  partitionIter => {
    partitionIter.map(do your logic).toList.toIterator
  }
)

toList will force Spark to materialize the data for the whole partition into mem, therefore watch out for this, because ops like toList will break the laziness of the function chain.

like image 2
linehrr Avatar answered Oct 13 '22 06:10

linehrr