Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What are the differences between slices and partitions of RDDs?

I am using Spark's Python API and running Spark 0.8.

I am storing a large RDD of floating point vectors and I need to perform calculations of one vector against the entire set.

Is there any difference between slices and partitions in an RDD?

When I create the RDD, I pass it 100 as a parameter which causes it to store the RDD as 100 slices and create 100 tasks when performing the calculations. I want to know if partitioning the data would improve performance beyond the slicing by enabling the system to process the data more efficiently (i.e. is there a difference between performing operations over a partition versus over just operating over every element in the sliced RDD).

For example, is there any significant difference between these two pieces of code?

rdd = sc.textFile(demo.txt, 100)

vs

rdd = sc.textFile(demo.txt)
rdd.partitionBy(100)
like image 860
user3749023 Avatar asked Jun 17 '14 17:06

user3749023


People also ask

What is an RDD partition?

Apache Spark's Resilient Distributed Datasets (RDD) are a collection of various data that are so big in size, that they cannot fit into a single node and should be partitioned across various nodes. Apache Spark automatically partitions RDDs and distributes the partitions across different nodes.

How many partitions should a Spark RDD have?

Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10) ).

What are the different types of partitioning methods in PySpark?

PySpark supports partition in two ways; partition in memory (DataFrame) and partition on the disk (File system). Partition in memory: You can partition or repartition the DataFrame by calling repartition() or coalesce() transformations.


2 Answers

I believe slices and partitions are the same thing in Apache Spark.

However, there is a subtle but potentially significant difference between the two pieces of code you posted.

This code will attempt to load demo.txt directly into 100 partitions using 100 concurrent tasks:

rdd = sc.textFile('demo.txt', 100)

For uncompressed text, it will work as expected. But if instead of demo.txt you had a demo.gz, you will end up with an RDD with only 1 partition. Reads against gzipped files cannot be parallelized.

On the other hand, the following code will first open demo.txt into an RDD with the default number of partitions, then it will explicitly repartition the data into 100 partitions that are roughly equal in size.

rdd = sc.textFile('demo.txt')
rdd = rdd.repartition(100)

So in this case, even with a demo.gz you will end up with an RDD with 100 partitions.

As a side note, I replaced your partitionBy() with repartition() since that's what I believe you were looking for. partitionBy() requires the RDD to be an RDD of tuples. Since repartition() is not available in Spark 0.8.0, you should instead be able to use coalesce(100, shuffle=True).

Spark can run 1 concurrent task for every partition of an RDD, up to the number of cores in your cluster. So if you have a cluster with 50 cores, you want your RDDs to at least have 50 partitions (and probably 2-3x times that).

As of Spark 1.1.0, you can check how many partitions an RDD has as follows:

rdd.getNumPartitions()  # Python API
rdd.partitions.size     // Scala API

Before 1.1.0, the way to do this with the Python API was rdd._jrdd.splits().size().

like image 78
Nick Chammas Avatar answered Oct 25 '22 19:10

Nick Chammas


You can do partition as follows:

import org.apache.spark.Partitioner

val p = new Partitioner() {
  def numPartitions = 2
  def getPartition(key: Any) = key.asInstanceOf[Int]
}
recordRDD.partitionBy(p)
like image 42
Haimei Avatar answered Oct 25 '22 19:10

Haimei