Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Order of column arguments in repartition vs partitionBy

Methods taken into consideration (Spark 2.2.1):

  1. DataFrame.repartition (the two implementations that take partitionExprs: Column* parameters)
  2. DataFrameWriter.partitionBy

Note: This question doesn't ask the difference between these methods

From docs of partitionBy:

If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a Dataset by year and then month, the directory layout would look like:

  • year=2016/month=01/
  • year=2016/month=02/

From this, I infer that the order of column arguments will decide the directory layout; hence it is relevant.

From docs of repartition:

Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions. The resulting Dataset is hash partitioned.

As per my current understanding, repartition decides the degree of parallelism in handling the DataFrame. With this definition, behaviour of repartition(numPartitions: Int) is straightforward but the same can't be said about the other two implementations of repartition that take partitionExprs: Column* arguments.


All things said, my doubts are following:

  • Like partitionBy method, is the order of column inputs relevant in repartition method too?
  • If the answer to above question is
    • No: Does each chunk extracted for parallel execution contain the same data as would have been in each group had we run a SQL query with GROUP BY on same columns?
    • Yes: Please explain the behaviour of repartition(columnExprs: Column*) method
  • What is the relevance of having both numPartitions: Int and partitionExprs: Column* arguments in the third implementation of repartition?
like image 221
y2k-shubham Avatar asked Jan 20 '18 12:01

y2k-shubham


2 Answers

The only similarity between these two methods are their names. There are used for different things and have different mechanics so you shouldn't compare them at all.

That being said, repartition shuffles data using:

  • With partitionExprs it uses hash partitioner on the columns used in the expression using spark.sql.shuffle.partitions.
  • With partitionExprs and numPartitions it does the same as the previous one, but overriding spark.sql.shuffle.partitions.
  • With numPartitions it just rearranges data using RoundRobinPartitioning.

the order of column inputs relevant in repartition method too?

It is. hash((x, y)) is in general not the same as hash((y, x)).

df = (spark.range(5, numPartitions=4).toDF("x")
    .selectExpr("cast(x as string)")
    .crossJoin(spark.range(5, numPartitions=4).toDF("y")))

df.repartition(4, "y", "x").rdd.glom().map(len).collect()
[8, 6, 9, 2]
df.repartition(4, "x", "y").rdd.glom().map(len).collect()
[6, 4, 3, 12]

Does each chunk extracted for parallel execution contain the same data as would have been in each group had we run a SQL query with GROUP BY on same columns?

Depending on what is the exact question.

  • Yes. GROUP BY with the same set of columns will result in the same logical distribution of keys over partitions.
  • No. Hash partitioner can map multiple keys to the same partition. GROUP BY "sees" only the actual groups.

Related How to define partitioning of DataFrame?

like image 76
zero323 Avatar answered Oct 16 '22 09:10

zero323


Before answering this question, let me clear you about some concepts in spark.

block: These are physically mapped to HDFS Folder and are capable of storing sub blocks and parquet/* files.

parquet: data store compressed files, commonly used in HDFS clusters to store data.

now coming to the answer.

Repartition(number_of_partitions, *columns) : this will create parquet files with data shuffled and sorted on the distinct combination values of the columns provided. therefore order of column doesn't make any difference here. you can provide any order in the background spark will get all the possible value of these columns, sort them and arrange the data in the files which will sum to the number_of_partitions .

PartionBy(*columns): this is slightly different from repartition. this will create blocks or folders in the HDFS with distinct values of columns provided in the parameters. so suppose:

Col A = [1,2,3,4,5]

while writing the table HDFS it will create the folder names colA-1

colA-2

colA-3 . . . and if you provide two columns then

colA-1/ colB-1 colB-2 colB-3 . .

colA-2/

colA-3/ . . .

and inside this it will store parquet files which will have data sorted on the parent column value. the number of files in this folder will be fixed by (bucketBy) attribute which will further suggest the maximum number of files in each folder. this is only available in pyspark 2.3 and in scala 1.6 onward.

like image 1
jigyasu nayyar Avatar answered Oct 16 '22 09:10

jigyasu nayyar