Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to control preferred locations of RDD partitions?

Is there a way to set the preferred locations of RDD partitions manually? I want to make sure certain partition be computed in a certain machine.

I'm using an array and the 'Parallelize' method to create a RDD from that.

Also I'm not using HDFS, The files are on the local disk. That's why I want to modify the execution node.

like image 790
sam93 Avatar asked Dec 13 '17 18:12

sam93


People also ask

How would I control the no of partitions in my RDD?

coalesce() and repartition() transformations are used for changing the number of partitions in the RDD. repartition() is calling coalesce() with explicit shuffling.

How will you create RDD with specific partitioning?

The loaded rdd is partitioned by default partitioner: hash code. To specify custom partitioner, use can check rdd. partitionBy(), provided with your own partitioner.

What is default number of partitions in RDD?

By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

How many partitions should a Spark RDD have?

In a Spark RDD, a number of partitions can always be monitor by using the partitions method of RDD. The spark partitioning method will show an output of 6 partitions, for the RDD that we created.


1 Answers

Is there a way to set the preferredLocations of RDD partitions manually?

Yes, there is, but it's RDD-specific and so different kinds of RDDs have different ways to do it.

Spark uses RDD.preferredLocations to get a list of preferred locations to compute each partition/split on (e.g. block locations for an HDFS file).

final def preferredLocations(split: Partition): Seq[String]

Get the preferred locations of a partition, taking into account whether the RDD is checkpointed.

As you see the method is final which means that no one can ever override it.

When you look at the source code of RDD.preferredLocations you will see how a RDD knows its preferred locations. It is using the protected RDD.getPreferredLocations method that a custom RDD may (but don't have to) override to specify placement preferences.

protected def getPreferredLocations(split: Partition): Seq[String] = Nil

So, now the question has "morphed" into another about what are the RDDs that allow for setting their preferred locations. Find yours and see the source code.

I'm using an array and the 'Parallelize' method to create a RDD from that.

If you parallelize your local dataset it's no longer distributed and can be such, but...why would you want to use Spark for something you can process locally on a single computer/node?

If however you insist and do really want to use Spark for local datasets, the RDD behind SparkContext.parallelize is...let's have a look at the source code... ParallelCollectionRDD which does allow for location preferences.

Let's then rephrase your question to the following (hoping I won't lose any important fact):

What are the operators that allow for creating a ParallelCollectionRDD and specifying the location preferences explicitly?

To my great surprise (as I didn't know about the feature), there is such an operator, i.e. SparkContext.makeRDD, that...accepts one or more location preferences (hostnames of Spark nodes) for each object.

makeRDD[T](seq: Seq[(T, Seq[String])]): RDD[T] Distribute a local Scala collection to form an RDD, with one or more location preferences (hostnames of Spark nodes) for each object. Create a new partition for each collection item.

In other words, rather than using parallelise you have to use makeRDD (which is available in Spark Core API for Scala, but am not sure about Python that I'm leaving as a home exercise for you :))

The same reasoning I'm applying to any other RDD operator / transformation that creates some sort of RDD.

like image 94
Jacek Laskowski Avatar answered Oct 20 '22 20:10

Jacek Laskowski