Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark CollectAsMap

I would like to know how collectAsMap works in Spark. More specifically I would like to know where the aggregation of the data of all partitions will take place? The aggregation either takes place in master or in workers. In the first case each worker send its data on master and when the master collects the data from each one worker, then master will aggregate the results. In the second case the workers are responsible to aggregate the results(after they exchange the data among them) and after that the results will be sent to the master.

It is critical for me to find a way so as the master to be able collect the data from each partition separately, without workers exchange data.

like image 313
Χρήστος Μάλλιος Avatar asked Apr 22 '15 18:04

Χρήστος Μάλλιος


People also ask

What is spark collectAsMap?

collectAsMap. Returns the pair RDD as a Map to the Spark Master. countByKey. Returns the count of each key elements. This returns the final result to local Map which is your driver.

What is Spark pair RDD?

Spark Paired RDDs are defined as the RDD containing a key-value pair. There is two linked data item in a key-value pair (KVP). We can say the key is the identifier, while the value is the data corresponding to the key value. In addition, most of the Spark operations work on RDDs containing any type of objects.

How do I join RDD in spark?

Return an RDD containing all pairs of elements with matching keys in self and other . Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other . Performs a hash join across the cluster.

Which transformation can be used to aggregate all values of the same key in a paired RDD?

For example, pair RDDs have a reduceByKey() method that can aggregate data separately for each key, and a join() method that can merge two RDDs together by grouping elements with the same key.


3 Answers

First of all in both the operations, all of your data which is present in RDD will travel from different executors/workers to Master/Driver. Both collect and collectAsMap will just collate the data from various executors/workers. SO this is why it is always recommended Not to use collect until and unless you don't have any other option.

I must say, this is the last collection one must consider from performance point of view.

  1. collect : will return the results as an Array.
  2. collectAsMap will return the results for paired RDD as Map collection. And since it is returning Map collection you will only get pairs with unique keys and pairs with duplicate keys will be removed.

Regards,

Neeraj

like image 93
Neeraj Bhadani Avatar answered Oct 18 '22 20:10

Neeraj Bhadani


Supporting to the above answers:

collectAsMap() - returns the key-value pairs as dictionary (countByKey() is another function which return dictionary.)

collectAsMap(), Collect(), take(n), takeOrdered(n), takeSample(False,..)

These methods brings all the data to the driver. Programmer need to take precaustion while using them in production.

like image 33
Krish Avatar answered Oct 18 '22 22:10

Krish


You can see how they are doing collectAsMap here. Since the RDD type is a tuple it looks like they just use the normal RDD collect and then translate the tuples into a map of key,value pairs. But they do mention in the comment that multi-map isn't supported, so you need a 1-to-1 key/value mapping across your data.

collectAsMap function

What collect does is execute a Spark job and get back the results from each partition from the workers and aggregates them with a reduce/concat phase on the driver.

collect function

So given that, it should be the case that the driver collects the data from each partition separately without workers exchanging data to perform collectAsMap.

Note, if you are doing transformations on your RDD prior to using collectAsMap that cause a shuffle to occur, there may be an intermediate step that causes workers to exchange data amongst themselves. Check out your cluster master's application UI to see more information regarding how spark is executing your application.

like image 20
Rich Avatar answered Oct 18 '22 22:10

Rich