Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark 2.0 read csv number of partitions (PySpark)

I'm trying to port some code from Spark 1.6 to Spark 2.0 using new stuffs from Spark 2.0. First, I want to use the csv reader from Spark 2.0. BTW, I'm using pyspark.

With the "old" textFile function, I'm able to set the minimum number of partitions. Ex:

file= sc.textFile('/home/xpto/text.csv', minPartitions=10)
header = file.first() #extract header
data = file.filter(lambda x:x !=header) #csv without header
...

Now, with Spark 2.0 I can read the csv directly:

df = spark.read.csv('/home/xpto/text.csv', header=True)
...

But I didn't find a way to set the minPartitions.

I need this to test the performance of my code.

Thx, Fred

like image 387
Frederico Oliveira Avatar asked Jun 30 '16 16:06

Frederico Oliveira


People also ask

How can you tell how many partitions a PySpark has?

PySpark (Spark with Python) Similarly, in PySpark you can get the current length/size of partitions by running getNumPartitions() of RDD class, so to use with DataFrame first you need to convert to RDD.

How do I know how many partitions Spark?

The best way to decide on the number of partitions in an RDD is to make the number of partitions equal to the number of cores in the cluster so that all the partitions will process in parallel and the resources will be utilized in an optimal way.

How would you get the number of partitions of a DataFrame DF?

Finding the number of partitions Simply turn the DataFrame to rdd and call partitions followed by size to get the number of partitions. We would see the number of partitions as 200.


2 Answers

The short answer is no: you can't set a minimum bar using a mechanism similar to the minPartitions parameter if using a DataFrameReader.

coalesce may be used in this case to reduce the partitions count, and repartition may be used to increase the partition count. When you are using coalesce, downstream performance may be better if you force a shuffle by providing the shuffle parameter (especially in cases of skewed data): coalesce(100,shuffle=True). This triggers a full shuffle of data, which carries cost implications similar to repartition.

Note that the above operations generally do not keep the original order of the file read (excepting if running coalesce without the shuffle parameter), so if a portion of your code depends on the dataset's order, you should avoid a shuffle prior to that point.

like image 112
Vijay Krishna Avatar answered Sep 24 '22 01:09

Vijay Krishna


I figured it out. The DataFrame (and RDD) has a method called "coalesce". Where the number of partitions can be set.

Ex:

>>> df = spark.read.csv('/home/xpto/text.csv', header=True).coalesce(2)
>>> df.rdd.getNumPartitions()
2

In my case, Spark splited my file in 153 partitions. I'm able to set the number of partitions to 10, but when I try to set to 300, it ignores and uses the 153 again (I don't know why).

REF: https://spark.apache.org/docs/2.0.0-preview/api/python/pyspark.sql.html#pyspark.sql.DataFrame.coalesce

like image 37
Frederico Oliveira Avatar answered Sep 23 '22 01:09

Frederico Oliveira