I have a quite a big dataset (100 million+ records with 100's of columns) that I am processing with spark. I am reading the data into a spark dataset and I want to filter this dataset and map a subset of its fields to a case class.
the code looks somewhat similar,
case class Subset(name:String,age:Int)
case class Complete(name:String,field1:String,field2....,age:Int)
val ds = spark.read.format("csv").load("data.csv").as[Complete]
#approach 1
ds.filter(x=>x.age>25).map(x=> Subset(x.name,x.age))
#approach 2
ds.flatMap(x=>if(x.age>25) Seq(Subset(x.name,x.age)) else Seq.empty)
Which approach is better? Any additional hints on how I can make this code more performant?
Thanks!
Edit
I ran some tests to compare the runtimes and it looks like approach 2 is quite faster, the code i used for getting the runtimes is as follows,
val subset = spark.time {
ds.filter(x=>x.age>25).map(x=> Subset(x.name,x.age))
}
spark.time {
subset.count()
}
and
val subset2 = spark.time {
ds.flatMap(x=>if(x.age>25) Seq(Subset(x.name,x.age)) else Seq.empty)
}
spark.time {
subset2.count()
}
flat_map is a lot slower (~1.6) than doing map first and then flattening the list, which is unexpected as flat_map is specialized and therefore ought to be faster (I think and it is in many languages). Also :lists. flatmap is 2.7 times faster, so there should be some room for improvements.
The performance is way better (1.63x faster) when we use built-in map function or slightly better/worse when we use built-in filter/reduce.
The map() transformation takes in a function and applies it to each element in the RDD and the result of the function is a new value of each element in the resulting RDD. The flatMap() is used to produce multiple output elements for each input element.
We can use a flatMap() method on a stream with the mapper function List::stream. On executing the stream terminal operation, each element of flatMap() provides a separate stream. In the final phase, the flatMap() method transforms all the streams into a new stream.
The function you pass to flatmap () operation returns an arbitrary number of values as the output. One-to-one mapping occurs in map (). One too many mapping occurs in flatMap (). Only perform the mapping.
T he flatMap () method first flattens the input Stream of Streams to a Stream of Strings (for more about flattening, see this article ). Thereafter, it works similarly to the map () method. 4. Conclusion Java 8 gives us the opportunity to use the map () and flatMap () methods that originally were used in functional languages.
assertEquals (Optional.of ( "STRING" ), Optional .of ( "string" ) .flatMap (s -> Optional.of ( "STRING" ))); 3. Map and Flatmap in Streams Both methods work similarly for Optional. The map () method wraps the underlying sequence in a Stream instance, whereas the flatMap () method allows avoiding nested Stream<Stream<R>> structure.
Example: Multiplying All the elements of the list by 3 and returning the updated list. flatMap() can be used where we have to flatten or transform out the string, as we cannot flatten our string using map(). Example: Getting the 1st Character of all the String present in a List of Strings and returning the result in form of a stream.
Update: My original answer contained an error: Spark does support Seq
as the result of a flatMap
(and converts the result back into an Dataset
). Apologies for the confusion. I also added more information on improving the performance of your analysis.
Update 2: I missed that you're using a Dataset
rather than an RDD
(doh!). This doesn't affect the answer significantly.
Spark is a distributed system that partitions data across multiple nodes and processes data in parallel. In terms of efficiency, actions that result in re-partitioning (requiring data to be transferred between nodes) is far more expensive in terms of run-time than in-place modifications. Also, you should note that operations that merely transform data, such as filter
, map
, flatMap
, etc. are merely stored and do not execute until an action operation is performed (such as reduce
, fold
, aggregate
, etc.). Consequently, neither alternative actually does anything as things stand.
When an action is performed on the result of these transformations, I would expect the filter
operation to be far more efficient: it only processes data (using the subsequent map
operation) that passes the predicate x=>x.age>25
(more typically written as _.age > 25
). While it may appear that filter
creates an intermediary collection, it executes lazilly. As a result, Spark appears to fuse the filter
and map
operations together.
Your flatMap
operation is, frankly, hideous. It forces processing, sequence creation and subsequent flattening of every data item, which will definitely increase overall processing.
That said, the best way to improve the performance of your analysis is to control the partitioning so that the data is split roughly evenly over as many nodes as possible. Refer to this guide as a good starting point.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With