In PySpark the repartition module has an optional columns argument which will of course repartition your dataframe by that key.
My question is - how does Spark repartition when there's no key? I couldn't dig any further into the source code to find where this goes through Spark itself.
def repartition(self, numPartitions, *cols):
"""
Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The
resulting DataFrame is hash partitioned.
:param numPartitions:
can be an int to specify the target number of partitions or a Column.
If it is a Column, it will be used as the first partitioning column. If not specified,
the default number of partitions is used.
.. versionchanged:: 1.6
Added optional arguments to specify the partitioning columns. Also made numPartitions
optional if partitioning columns are specified.
>>> df.repartition(10).rdd.getNumPartitions()
10
>>> data = df.union(df).repartition("age")
>>> data.show()
+---+-----+
|age| name|
+---+-----+
| 5| Bob|
| 5| Bob|
| 2|Alice|
| 2|Alice|
+---+-----+
>>> data = data.repartition(7, "age")
>>> data.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
| 2|Alice|
| 5| Bob|
+---+-----+
>>> data.rdd.getNumPartitions()
7
"""
if isinstance(numPartitions, int):
if len(cols) == 0:
return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx)
else:
return DataFrame(
self._jdf.repartition(numPartitions, self._jcols(*cols)), self.sql_ctx)
elif isinstance(numPartitions, (basestring, Column)):
cols = (numPartitions, ) + cols
return DataFrame(self._jdf.repartition(self._jcols(*cols)), self.sql_ctx)
else:
raise TypeError("numPartitions should be an int or Column")
For example: it's totally fine to call these lines but I have no idea what it's actually doing. Is it a hash of the entire line? Perhaps the first column in the dataframe?
df_2 = df_1\
.where(sf.col('some_column') == 1)\
.repartition(32)\
.alias('df_2')
By default, If there is no partitioner specified the partitioning is not based upon characteristic of data but it is distributed in random and uniform way across nodes.
The repartition algorithm behind df.repartition
does a full data shuffle and equally distributes the data among the partitions. To reduce shuffling it is better to use df.coalesce
Here is some good explanation how to repartition with DataFrame
https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With