I read through theoretical differences between map and mapPartitions, & 'm much clear when to use them in varied situations.
But my problem described below is more based upon GC activity & Memory (RAM). Please read below for the problem:-
=> I wrote a map function to convert Row to String. So, an input of RDD[org.apache.spark.sql.Row] would be mapped to RDD[String]. But with this approach map object would be created for every row of an RDD. Thus creation of such large number of objects may increase GC activity.
=> To resolve above, I thought of using mapPartitions. So, that number of objects become equivalent to number of partitions. mapPartitions gives Iterator as an input and accepts to return and java.lang.Iterable. But most of the Iterable like Array, List, etc are in memory. So, if I have huge amount of data then would creating a Iterable this way can lead to out of Memory ? or Is there any other collection (java or scala) that should be utilized here (to spill to Disk in case memory starts to fill)? or should we only use mapPartitions in case RDD is completely in Memory?
Thanks in advance. Any help would be greatly appreciated.
mapPartitions() – This is precisely the same as map(); the difference being, Spark mapPartitions() provides a facility to do heavy initializations (for example, Database connection) once for each partition instead of doing it on every DataFrame row.
RDD map() transformation is used to apply any complex operations like adding a column, updating a column, transforming the data e.t.c, the output of map transformations would always have the same number of records as input.
map(func):Return a new distributed dataset formed by passing each element of the source through a function func. filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.
map :It returns a new RDD by applying a function to each element of the RDD. Function in map can return only one item. flatMap: Similar to map, it returns a new RDD by applying a function to each element of the RDD, but output is flattened.
If you think about JavaRDD.mapPartitions
it takes FlatMapFunction
(or some variant like DoubleFlatMapFunction
) which is expected to return Iterator
not Iterable
. If underlaying collection is lazy then you have nothing to worry about.
RDD.mapPartitions
takes a functions from Iterator
to Iterator
.
I general if you use reference data you can replace mapPartitions
with map
and use static member to store data. This will have the same footprint and will be easier to write.
to answer your question about mapPartition(f: Iterator => Iterator). it is lazy and also does not hold the whole partition in mem. Spark will use this(we can consider it to be a Functor in FP term) Iterator => Iterator function and recompile it into its own code to execute. if partition is too big, it will spill to disk before next shuffle point. so don't worry about it.
one thing that needs to mention is, you can force your function to materialize data into mem, simply by doing:
rdd.mapPartition(
partitionIter => {
partitionIter.map(do your logic).toList.toIterator
}
)
toList
will force Spark to materialize the data for the whole partition into mem, therefore watch out for this, because ops like toList
will break the laziness of the function chain.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With