I'm trying to port some code from Spark 1.6 to Spark 2.0 using new stuffs from Spark 2.0. First, I want to use the csv reader from Spark 2.0. BTW, I'm using pyspark.
With the "old" textFile
function, I'm able to set the minimum number of partitions. Ex:
file= sc.textFile('/home/xpto/text.csv', minPartitions=10)
header = file.first() #extract header
data = file.filter(lambda x:x !=header) #csv without header
...
Now, with Spark 2.0 I can read the csv directly:
df = spark.read.csv('/home/xpto/text.csv', header=True)
...
But I didn't find a way to set the minPartitions
.
I need this to test the performance of my code.
Thx, Fred
PySpark (Spark with Python) Similarly, in PySpark you can get the current length/size of partitions by running getNumPartitions() of RDD class, so to use with DataFrame first you need to convert to RDD.
The best way to decide on the number of partitions in an RDD is to make the number of partitions equal to the number of cores in the cluster so that all the partitions will process in parallel and the resources will be utilized in an optimal way.
Finding the number of partitions Simply turn the DataFrame to rdd and call partitions followed by size to get the number of partitions. We would see the number of partitions as 200.
The short answer is no: you can't set a minimum bar using a mechanism similar to the minPartitions parameter if using a DataFrameReader.
coalesce
may be used in this case to reduce the partitions count, and repartition
may be used to increase the partition count. When you are using coalesce
, downstream performance may be better if you force a shuffle by providing the shuffle parameter (especially in cases of skewed data): coalesce(100,shuffle=True)
. This triggers a full shuffle of data, which carries cost implications similar to repartition
.
Note that the above operations generally do not keep the original order of the file read (excepting if running coalesce
without the shuffle parameter), so if a portion of your code depends on the dataset's order, you should avoid a shuffle prior to that point.
I figured it out. The DataFrame (and RDD) has a method called "coalesce". Where the number of partitions can be set.
Ex:
>>> df = spark.read.csv('/home/xpto/text.csv', header=True).coalesce(2)
>>> df.rdd.getNumPartitions()
2
In my case, Spark splited my file in 153 partitions. I'm able to set the number of partitions to 10, but when I try to set to 300, it ignores and uses the 153 again (I don't know why).
REF: https://spark.apache.org/docs/2.0.0-preview/api/python/pyspark.sql.html#pyspark.sql.DataFrame.coalesce
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With