I have many spark dataframes on which I need to do the following:
1) load a single spark dataframe
2) select rows from it
3) merge it with all of the previous spark dataframes
Now, each of the above operations requires a different numberof partitions. The selecting rows requires many partitions, like 100 partitions. The merging requires very few partitions, like 10 partitions.
So, I really want it to work like this:
1) load a single spark dataframe
1.5) repartition into 100 partitions
2) select rows from it
2.5) repartition into 10 partitions
3) merge it with all of the previous spark dataframes
Now, how do I force this to repartition in between steps 1 and 2 and in between 2 and 3?
I know that when I call data = data.repartition(7)
it is lazily evaluated, and so it only repartitions when it is actually saving.
So, I have been doing it like this:
1) load a single spark dataframe
1.5) repartition into 100 partitions
1.75) `df.count()` *just* to force materialization
2) select rows from it
2.5) repartition into 10 partitions
2.75) `df.count()` *just* to force materialization
3) merge it with all of the previous spark dataframes
Is there a better way to force it to repartition in between here? Is there a better way than running count()
on the dataframe?
The repartition() method is used to increase or decrease the number of partitions of an RDD or dataframe in spark. This method performs a full shuffle of data across all the nodes. It creates partitions of more or less equal in size. This is a costly operation given that it involves data movement all over the network.
By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine. Data of each partition resides in a single machine. Spark/PySpark creates a task for each partition. Spark Shuffle operations move the data from one partition to other partitions.
Spark Default Partitioner It uses a Hash Partitioner, by default, to partition the data across different partitions. The Hash Partitioner works on the concept of using the hashcode() function. The concept of hashcode() is that equal objects have the same hashcode.
To decrease the number of partitions resulting from shuffle operations, we can use the default advisory partition shuffle size, and set parallelism first to false. (Spark documentation also recommends you set this value to false when you know what you're doing.)
As all transformations of dataframes in spark are lazily evaluated you need to perform an action to actually perform the transformations. Currently there are no other way to force the transformations.
All available dataframe actions can be found in the documentation (look under actions). In your case, instead of using count()
to force the transformation you could use first()
which should be significantly cheaper.
In step 2.5 you could replace the repartition()
with coalesce()
as it will avoid a full shuffle. This is often advantageous when the new number of partitions is less than before, since it will minimize the data movement.
EDIT:
To answer your question about what happens if you do not use any action and simply do: 1) repartition, 2) spark dataframe transform, 3) repartition. Due to optimizations spark performs on the transformations it seems to not always be the case that this order is followed. I made a small test program to test it out:
val df = spark.sparkContext.parallelize(Array((1.0,"a"),(2.0,"b"),(3.0,"c"),(1.0,"d"),(2.0,"e"),(3.0,"f"))).toDF("x", "y")
val df1 = df.repartition(10).filter($"x" =!= 1.0).repartition(5).filter($"y" =!= "b")
df1.explain(true)
This returns information about how the dataframe is computed.
== Parsed Logical Plan ==
'Filter NOT ('y = b)
+- Repartition 5, true
+- Filter NOT (x#5 = 1.0)
+- Repartition 10, true
+- Project [_1#2 AS x#5, _2#3 AS y#6]
+- LogicalRDD [_1#2, _2#3]
== Analyzed Logical Plan ==
x: double, y: string
Filter NOT (y#6 = b)
+- Repartition 5, true
+- Filter NOT (x#5 = 1.0)
+- Repartition 10, true
+- Project [_1#2 AS x#5, _2#3 AS y#6]
+- LogicalRDD [_1#2, _2#3]
== Optimized Logical Plan ==
Repartition 5, true
+- Project [_1#2 AS x#5, _2#3 AS y#6]
+- Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b))
+- LogicalRDD [_1#2, _2#3]
== Physical Plan ==
Exchange RoundRobinPartitioning(5)
+- *Project [_1#2 AS x#5, _2#3 AS y#6]
+- *Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b))
+- Scan ExistingRDD[_1#2,_2#3]
As can be seen here, the repartition(10)
step is not included and seems to have been removed during the optimization.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With