Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What does df.repartition with no column arguments partition on?

In PySpark the repartition module has an optional columns argument which will of course repartition your dataframe by that key.

My question is - how does Spark repartition when there's no key? I couldn't dig any further into the source code to find where this goes through Spark itself.

def repartition(self, numPartitions, *cols):
    """
    Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The
    resulting DataFrame is hash partitioned.

    :param numPartitions:
        can be an int to specify the target number of partitions or a Column.
        If it is a Column, it will be used as the first partitioning column. If not specified,
        the default number of partitions is used.

    .. versionchanged:: 1.6
       Added optional arguments to specify the partitioning columns. Also made numPartitions
       optional if partitioning columns are specified.

    >>> df.repartition(10).rdd.getNumPartitions()
    10
    >>> data = df.union(df).repartition("age")
    >>> data.show()
    +---+-----+
    |age| name|
    +---+-----+
    |  5|  Bob|
    |  5|  Bob|
    |  2|Alice|
    |  2|Alice|
    +---+-----+
    >>> data = data.repartition(7, "age")
    >>> data.show()
    +---+-----+
    |age| name|
    +---+-----+
    |  2|Alice|
    |  5|  Bob|
    |  2|Alice|
    |  5|  Bob|
    +---+-----+
    >>> data.rdd.getNumPartitions()
    7
    """
    if isinstance(numPartitions, int):
        if len(cols) == 0:
            return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx)
        else:
            return DataFrame(
                self._jdf.repartition(numPartitions, self._jcols(*cols)), self.sql_ctx)
    elif isinstance(numPartitions, (basestring, Column)):
        cols = (numPartitions, ) + cols
        return DataFrame(self._jdf.repartition(self._jcols(*cols)), self.sql_ctx)
    else:
        raise TypeError("numPartitions should be an int or Column")

For example: it's totally fine to call these lines but I have no idea what it's actually doing. Is it a hash of the entire line? Perhaps the first column in the dataframe?

df_2 = df_1\
       .where(sf.col('some_column') == 1)\
       .repartition(32)\
       .alias('df_2')
like image 609
veronik Avatar asked Nov 29 '18 00:11

veronik


1 Answers

By default, If there is no partitioner specified the partitioning is not based upon characteristic of data but it is distributed in random and uniform way across nodes.

The repartition algorithm behind df.repartition does a full data shuffle and equally distributes the data among the partitions. To reduce shuffling it is better to use df.coalesce

Here is some good explanation how to repartition with DataFrame https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

like image 155
Stefan Repcek Avatar answered Nov 15 '22 13:11

Stefan Repcek