I'm perplexed between the behaviour of numPartitions
parameter in the following methods:
DataFrameReader.jdbc
Dataset.repartition
The official docs of DataFrameReader.jdbc
say following regarding numPartitions
parameter
numPartitions: the number of partitions. This, along with lowerBound (inclusive), upperBound (exclusive), form partition strides for generated WHERE clause expressions used to split the column columnName evenly.
And official docs of Dataset.repartition
say
Returns a new Dataset that has exactly
numPartitions
partitions.
My current understanding:
numPartition
parameter in DataFrameReader.jdbc
method controls the degree of parallelism in reading the data from databasenumPartition
parameter in Dataset.repartition
controls the number of output files that will be generated when this DataFrame
would be written to diskMy questions:
DataFrame
via DataFrameReader.jdbc
and then write it to disk (without invoking repartition
method), then would there still be as many files in output as there would've been had I written out a DataFrame
to disk after having invoked repartition
on it?repartition
method on a DataFrame
that was read using DataFrameReader.jdbc
method (with numPartitions
parameter)?numPartitions
parameter of DataFrameReader.jdbc
method be called something like 'parallelism'?repartition() creates a specified number of partitions in memory. The partitionBy () will write files to disk for each memory partition and partition column. After running the above statement, you should see only 2 part files for each state .
Differences between coalesce and repartitionThe repartition algorithm does a full shuffle of the data and creates equal sized partitions of data. coalesce combines existing partitions to avoid a full shuffle.
repartition redistributes the data evenly, but at the cost of a shuffle. coalesce works much faster when you reduce the number of partitions because it sticks input partitions together. coalesce doesn't guarantee uniform data distribution. coalesce is identical to a repartition when you increase the number of ...
The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
Short answer: There is (almost) no difference in behaviour of numPartitions
parameter in the two methods
read.jdbc(..numPartitions..)
Here, the numPartitions
parameter controls:
MySQL
(or any other RDBM
) for reading the data into DataFrame
.DataFrame
including writing to disk until repartition
method is invoked on itrepartition(..numPartitions..)
Here numPartitions
parameter controls the degree of parallelism that would be exhibited in performing any operation of the DataFrame
, including writing to disk.
So basically the DataFrame
obtained on reading MySQL
table using spark.read.jdbc(..numPartitions..)
method behaves the same (exhibits the same degree of parallelism in operations performed over it) as if it was read without parallelism and the repartition(..numPartitions..)
method was invoked on it afterwards (obviously with same value of numPartitions
)
To answer to exact questions:
If I read DataFrame via DataFrameReader.jdbc and then write it to disk (without invoking repartition method), then would there still be as many files in output as there would've been had I written out a DataFrame to disk after having invoked repartition on it?
Yes
Assuming that the read task had been parallelized by providing appropriate parameters (columnName
, lowerBound
, upperBound
& numPartitions
), all operations on the resulting DataFrame
including write will be performed in parallel. Quoting the official docs here:
numPartitions: The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
Yes: Then is it redundant to invoke repartition method on a DataFrame that was read using DataFrameReader.jdbc method (with numPartitions parameter)?
Yes
Unless you invoke the other variations of repartition
method (the ones that take columnExprs
param), invoking repartition
on such a DataFrame
(with same numPartitions
) parameter is redundant. However, I'm not sure if forcing same degree of parallelism on an already-parallelized DataFrame
also invokes shuffling of data among executors
unnecessarily. Will update the answer once I come across it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With