Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does flatmap give better performance than filter+map?

I have a quite a big dataset (100 million+ records with 100's of columns) that I am processing with spark. I am reading the data into a spark dataset and I want to filter this dataset and map a subset of its fields to a case class.

the code looks somewhat similar,

case class Subset(name:String,age:Int)
case class Complete(name:String,field1:String,field2....,age:Int)

val ds = spark.read.format("csv").load("data.csv").as[Complete]

#approach 1
ds.filter(x=>x.age>25).map(x=> Subset(x.name,x.age))

#approach 2
ds.flatMap(x=>if(x.age>25) Seq(Subset(x.name,x.age)) else Seq.empty)

Which approach is better? Any additional hints on how I can make this code more performant?

Thanks!

Edit

I ran some tests to compare the runtimes and it looks like approach 2 is quite faster, the code i used for getting the runtimes is as follows,

val subset = spark.time {
   ds.filter(x=>x.age>25).map(x=> Subset(x.name,x.age))
}

spark.time {
   subset.count()
}

and 

val subset2 = spark.time {
   ds.flatMap(x=>if(x.age>25) Seq(Subset(x.name,x.age)) else Seq.empty)
}

spark.time {
   subset2.count()
}
like image 841
Sai Kiran KrishnaMurthy Avatar asked Jun 25 '19 20:06

Sai Kiran KrishnaMurthy


People also ask

Is flatMap slower than map?

flat_map is a lot slower (~1.6) than doing map first and then flattening the list, which is unexpected as flat_map is specialized and therefore ought to be faster (I think and it is in many languages). Also :lists. flatmap is 2.7 times faster, so there should be some room for improvements.

Which is faster map or filter?

The performance is way better (1.63x faster) when we use built-in map function or slightly better/worse when we use built-in filter/reduce.

What is the difference between map () and flatMap () transformation?

The map() transformation takes in a function and applies it to each element in the RDD and the result of the function is a new value of each element in the resulting RDD. The flatMap() is used to produce multiple output elements for each input element.

Why do we need flatMap?

We can use a flatMap() method on a stream with the mapper function List::stream. On executing the stream terminal operation, each element of flatMap() provides a separate stream. In the final phase, the flatMap() method transforms all the streams into a new stream.

What is the difference between map () and flatMap ()?

The function you pass to flatmap () operation returns an arbitrary number of values as the output. One-to-one mapping occurs in map (). One too many mapping occurs in flatMap (). Only perform the mapping.

How does flatMap work in Java 8?

T he flatMap () method first flattens the input Stream of Streams to a Stream of Strings (for more about flattening, see this article ). Thereafter, it works similarly to the map () method. 4. Conclusion Java 8 gives us the opportunity to use the map () and flatMap () methods that originally were used in functional languages.

What is the difference between flatMap and assertEquals in streams?

assertEquals (Optional.of ( "STRING" ), Optional .of ( "string" ) .flatMap (s -> Optional.of ( "STRING" ))); 3. Map and Flatmap in Streams Both methods work similarly for Optional. The map () method wraps the underlying sequence in a Stream instance, whereas the flatMap () method allows avoiding nested Stream<Stream<R>> structure.

What is the use of flatMap () method in Python?

Example: Multiplying All the elements of the list by 3 and returning the updated list. flatMap() can be used where we have to flatten or transform out the string, as we cannot flatten our string using map(). Example: Getting the 1st Character of all the String present in a List of Strings and returning the result in form of a stream.


1 Answers

Update: My original answer contained an error: Spark does support Seq as the result of a flatMap (and converts the result back into an Dataset). Apologies for the confusion. I also added more information on improving the performance of your analysis.

Update 2: I missed that you're using a Dataset rather than an RDD (doh!). This doesn't affect the answer significantly.

Spark is a distributed system that partitions data across multiple nodes and processes data in parallel. In terms of efficiency, actions that result in re-partitioning (requiring data to be transferred between nodes) is far more expensive in terms of run-time than in-place modifications. Also, you should note that operations that merely transform data, such as filter, map, flatMap, etc. are merely stored and do not execute until an action operation is performed (such as reduce, fold, aggregate, etc.). Consequently, neither alternative actually does anything as things stand.

When an action is performed on the result of these transformations, I would expect the filter operation to be far more efficient: it only processes data (using the subsequent map operation) that passes the predicate x=>x.age>25 (more typically written as _.age > 25). While it may appear that filter creates an intermediary collection, it executes lazilly. As a result, Spark appears to fuse the filter and map operations together.

Your flatMap operation is, frankly, hideous. It forces processing, sequence creation and subsequent flattening of every data item, which will definitely increase overall processing.

That said, the best way to improve the performance of your analysis is to control the partitioning so that the data is split roughly evenly over as many nodes as possible. Refer to this guide as a good starting point.

like image 138
Mike Allen Avatar answered Nov 08 '22 11:11

Mike Allen