I am using Spark's Python API and running Spark 0.8.
I am storing a large RDD of floating point vectors and I need to perform calculations of one vector against the entire set.
Is there any difference between slices and partitions in an RDD?
When I create the RDD, I pass it 100 as a parameter which causes it to store the RDD as 100 slices and create 100 tasks when performing the calculations. I want to know if partitioning the data would improve performance beyond the slicing by enabling the system to process the data more efficiently (i.e. is there a difference between performing operations over a partition versus over just operating over every element in the sliced RDD).
For example, is there any significant difference between these two pieces of code?
rdd = sc.textFile(demo.txt, 100)
vs
rdd = sc.textFile(demo.txt)
rdd.partitionBy(100)
Apache Spark's Resilient Distributed Datasets (RDD) are a collection of various data that are so big in size, that they cannot fit into a single node and should be partitioned across various nodes. Apache Spark automatically partitions RDDs and distributes the partitions across different nodes.
Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10) ).
PySpark supports partition in two ways; partition in memory (DataFrame) and partition on the disk (File system). Partition in memory: You can partition or repartition the DataFrame by calling repartition() or coalesce() transformations.
I believe slices
and partitions
are the same thing in Apache Spark.
However, there is a subtle but potentially significant difference between the two pieces of code you posted.
This code will attempt to load demo.txt
directly into 100 partitions using 100 concurrent tasks:
rdd = sc.textFile('demo.txt', 100)
For uncompressed text, it will work as expected. But if instead of demo.txt
you had a demo.gz
, you will end up with an RDD with only 1 partition. Reads against gzipped files cannot be parallelized.
On the other hand, the following code will first open demo.txt
into an RDD with the default number of partitions, then it will explicitly repartition the data into 100 partitions that are roughly equal in size.
rdd = sc.textFile('demo.txt')
rdd = rdd.repartition(100)
So in this case, even with a demo.gz
you will end up with an RDD with 100 partitions.
As a side note, I replaced your partitionBy()
with repartition()
since that's what I believe you were looking for. partitionBy()
requires the RDD to be an RDD of tuples. Since repartition()
is not available in Spark 0.8.0, you should instead be able to use coalesce(100, shuffle=True)
.
Spark can run 1 concurrent task for every partition of an RDD, up to the number of cores in your cluster. So if you have a cluster with 50 cores, you want your RDDs to at least have 50 partitions (and probably 2-3x times that).
As of Spark 1.1.0, you can check how many partitions an RDD has as follows:
rdd.getNumPartitions() # Python API
rdd.partitions.size // Scala API
Before 1.1.0, the way to do this with the Python API was rdd._jrdd.splits().size()
.
You can do partition as follows:
import org.apache.spark.Partitioner
val p = new Partitioner() {
def numPartitions = 2
def getPartition(key: Any) = key.asInstanceOf[Int]
}
recordRDD.partitionBy(p)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With