A data lake I am working with (df
) has 2 TB of data and 20,000 files. I'd like to compact the data set into 2,000 1 GB files.
If you run df.coalesce(2000)
and write out to disk, the data lake contains 1.9 TB of data.
If you run df.repartition(2000)
and write out to disk, the data lake contains 2.6 TB of data.
Each file in the repartition()
data lake is exactly 0.3 GB larger than expected (they’re all 1.3 GB files instead of 1 GB files).
Why does the repartition()
method increase the size of the overall data lake?
There is a related question that discusses why the size of a data lake increases after aggregations are run. The answer says:
In general columnar storage formats like Parquet are highly sensitive when it comes to data distribution (data organization) and cardinality of individual columns. The more organized is data and the lower is cardinality the more efficient is the storage.
Is the coalesce()
algorithm providing data that's more organized... I don't think so...
I don't think the other question answers my question.
The repartition function allows us to change the distribution of the data on the Spark cluster. This distribution change will induce shuffle (physical data movement) under the hood, which is quite an expensive operation.
Repartition works by creating new partitions and doing a full shuffle to move data around. Results in more or less equal sized partitions. Since a full shuffle takes place, repartition is less performant than coalesce.
One difference I get is that with repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions can only be decreased. If the partitions are spread across multiple machines and coalesce() is run, how can it avoid data movement?
In case you use . repartition(1) it will only create a single file per partition. the main benefit is, the less the number of file per partition, the higher the reading speed will be. However if the file size becomes more than or almost a GB, then better to go for 2nd partition like .
Disclaimer:
This answer contains primarily speculations. A detailed explanation of this phenomena might require in-depth analysis of the input and output (or at least their respective metadata).
Observations:
Both persistent columnar formats as well as the internal Spark SQL representation transparently apply different compression techniques (like Run-length encoding or dictionary encoding) to reduce the memory footprint of the stored data.
Additionally on disk formats (including plain text data) can be explicitly compressed using general purpose compression algorithms - it is not clear if this is the case here.
Compression (explicit or transparent) are applied to blocks of data (typically partitions, but smaller units can be used).
Based on 1), 2) and 3) we can assume that the average compression rate will depend on the distribution of the data in the cluster. We should also note that the final result can be non-deterministic, if the upstream lineage contains wide transformations.
Possible impact of coalesce
vs. repartition
:
In general coalesce
can take two paths:
In the first case we can expect that the compression rate will be comparable to the compression rate of the input. However there are some cases where can achieve much smaller final output. Let's imagine a degenerate dataset:
val df = sc.parallelize(
Seq("foo", "foo", "foo", "bar", "bar", "bar"),
6
).toDF
If dataset like this was written to disk there would be no potential for compression - each value has to be written as-is:
df.withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
| foo| 0|
| foo| 1|
| foo| 2|
| bar| 3|
| bar| 4|
| bar| 5|
+-----+---+
In other words we need roughly 6 * 3 bytes giving 18 bytes in total.
However if we coalesce
df.coalesce(2).withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
| foo| 0|
| foo| 0|
| foo| 0|
| bar| 1|
| bar| 1|
| bar| 1|
+-----+---+
we can for example apply RLE with small int as the count, and store each partition 3 + 1 bytes giving 8 bytes in total.
This is of course a huge oversimplification, but shows how preserving low entropy input structure, and merging blocks can result in a lower memory footprint.
The second coalesce
scenario is less obvious, but there are scenarios where entropy can be reduced by the upstream process (think for example about window functions) and preserving such structure will be beneficial.
What about repartition
?
Without partitioning expression repartition
applies RoundRobinPartitioning
(implemented as HashPartitioning
with a pseudo-random key based on the partition id). As long as the hash function behaves sensibly, such redistribution should maximize the entropy of the data and as a result decrease possible compression rate.
Conclusion:
coalesce
shouldn't provide any specific benefits alone, but can preserve existing properties of data distribution - this property can be advantageous in certain cases.
repartition
, due to its nature, will on average makes things worse, unless entropy of the data is already maximized (a scenario where things improve is possible, but highly unlikely on non-trivial dataset).
Finally repartition
with partitioning expression or repartitionByRange
should decrease entropy, and improve compression rates.
Note:
We should also keep in mind that columnar formats usually decide on a specific compression / encoding method (or lack of it) based on the runtime statistics. So even if the set of rows in a particular block is fixed, but the order of rows changes, we can observe different outcomes.
I agree with @10465355's answer. Here I have an extreme example for this.
There is a table called table_a. All its columns are strings. Its storage format is Orc and and is generated by
insert overwrite table table_a
select a,b,...,i
from table_other
group by a,b,...,i
After HashAggregate operation, the data in table_a is organized enough. Especially the first column a
. The orc file is 6.97 MB. (In fact, there is a small 2.09 KB file, which I ignored later.)
Then, we repartition
table_a.
val querydf = spark.sql("""select *
from table_a distribute by rand()""").repartition(1)
querydf.createOrReplaceTempView("tmpTable")
spark.sql("""insert overwrite table table_a
select a,b,...,i
from tmpTable""")
When numpartitions
=1, Random(hashing.byteswap32(index)).nextInt(numPartitions)
does not trigger random redistribution. So we add distribute by rand()
be equivalent to repartition(n)
and get a file with a size of 14.26 MB.
We can use hive --orcfiledump
to get the file structure of the orc file.
Before repartition
:
Stripes:
Stripe: offset: 3 data: 7288854 rows: 668265 tail: 354 index: 13637
Stream: column 0 section ROW_INDEX start: 3 length 50
Stream: column 1 section ROW_INDEX start: 53 length 1706
Stream: column 2 section ROW_INDEX start: 1759 length 672
Stream: column 3 section ROW_INDEX start: 2431 length 2297
Stream: column 4 section ROW_INDEX start: 4728 length 1638
Stream: column 5 section ROW_INDEX start: 6366 length 1270
Stream: column 6 section ROW_INDEX start: 7636 length 1887
Stream: column 7 section ROW_INDEX start: 9523 length 1823
Stream: column 8 section ROW_INDEX start: 11346 length 1120
Stream: column 9 section ROW_INDEX start: 12466 length 1174
Stream: column 1 section DATA start: 13640 length 209662
Stream: column 1 section LENGTH start: 223302 length 1158
Stream: column 1 section DICTIONARY_DATA start: 224460 length 231328
Stream: column 2 section DATA start: 455788 length 29861
Stream: column 2 section LENGTH start: 485649 length 5
Stream: column 2 section DICTIONARY_DATA start: 485654 length 33
Stream: column 3 section DATA start: 485687 length 424936
Stream: column 3 section LENGTH start: 910623 length 4069
Stream: column 3 section DICTIONARY_DATA start: 914692 length 41298
Stream: column 4 section DATA start: 955990 length 443602
Stream: column 4 section LENGTH start: 1399592 length 4122
Stream: column 4 section DICTIONARY_DATA start: 1403714 length 56217
Stream: column 5 section DATA start: 1459931 length 475983
Stream: column 5 section LENGTH start: 1935914 length 2650
Stream: column 5 section DICTIONARY_DATA start: 1938564 length 17798
Stream: column 6 section DATA start: 1956362 length 480891
Stream: column 6 section LENGTH start: 2437253 length 4230
Stream: column 6 section DICTIONARY_DATA start: 2441483 length 27873
Stream: column 7 section DATA start: 2469356 length 2716359
Stream: column 7 section LENGTH start: 5185715 length 304679
Stream: column 8 section DATA start: 5490394 length 438723
Stream: column 8 section LENGTH start: 5929117 length 58072
Stream: column 8 section DICTIONARY_DATA start: 5987189 length 424961
Stream: column 9 section DATA start: 6412150 length 630248
Stream: column 9 section LENGTH start: 7042398 length 1455
Stream: column 9 section DICTIONARY_DATA start: 7043853 length 258641
Encoding column 0: DIRECT
Encoding column 1: DICTIONARY_V2[48184]
Encoding column 2: DICTIONARY_V2[3]
Encoding column 3: DICTIONARY_V2[4252]
Encoding column 4: DICTIONARY_V2[4398]
Encoding column 5: DICTIONARY_V2[4404]
Encoding column 6: DICTIONARY_V2[5553]
Encoding column 7: DIRECT_V2
Encoding column 8: DICTIONARY_V2[105667]
Encoding column 9: DICTIONARY_V2[60943]
After repartition:
Stripes:
Stripe: offset: 3 data: 14940022 rows: 668284 tail: 344 index: 12312
Stream: column 0 section ROW_INDEX start: 3 length 50
Stream: column 1 section ROW_INDEX start: 53 length 1755
Stream: column 2 section ROW_INDEX start: 1808 length 678
Stream: column 3 section ROW_INDEX start: 2486 length 1815
Stream: column 4 section ROW_INDEX start: 4301 length 1297
Stream: column 5 section ROW_INDEX start: 5598 length 1217
Stream: column 6 section ROW_INDEX start: 6815 length 1841
Stream: column 7 section ROW_INDEX start: 8656 length 1330
Stream: column 8 section ROW_INDEX start: 9986 length 1289
Stream: column 9 section ROW_INDEX start: 11275 length 1040
Stream: column 1 section DATA start: 12315 length 4260547
Stream: column 1 section LENGTH start: 4272862 length 15955
Stream: column 2 section DATA start: 4288817 length 102153
Stream: column 2 section LENGTH start: 4390970 length 5
Stream: column 2 section DICTIONARY_DATA start: 4390975 length 33
Stream: column 3 section DATA start: 4391008 length 1033345
Stream: column 3 section LENGTH start: 5424353 length 4069
Stream: column 3 section DICTIONARY_DATA start: 5428422 length 41298
Stream: column 4 section DATA start: 5469720 length 1044769
Stream: column 4 section LENGTH start: 6514489 length 4122
Stream: column 4 section DICTIONARY_DATA start: 6518611 length 56217
Stream: column 5 section DATA start: 6574828 length 1142805
Stream: column 5 section LENGTH start: 7717633 length 2650
Stream: column 5 section DICTIONARY_DATA start: 7720283 length 17798
Stream: column 6 section DATA start: 7738081 length 1147888
Stream: column 6 section LENGTH start: 8885969 length 4230
Stream: column 6 section DICTIONARY_DATA start: 8890199 length 27873
Stream: column 7 section DATA start: 8918072 length 1705640
Stream: column 7 section LENGTH start: 10623712 length 208184
Stream: column 7 section DICTIONARY_DATA start: 10831896 length 1525605
Stream: column 8 section DATA start: 12357501 length 513225
Stream: column 8 section LENGTH start: 12870726 length 58100
Stream: column 8 section DICTIONARY_DATA start: 12928826 length 424905
Stream: column 9 section DATA start: 13353731 length 1338510
Stream: column 9 section LENGTH start: 14692241 length 1455
Stream: column 9 section DICTIONARY_DATA start: 14693696 length 258641
Encoding column 0: DIRECT
Encoding column 1: DIRECT_V2
Encoding column 2: DICTIONARY_V2[3]
Encoding column 3: DICTIONARY_V2[4252]
Encoding column 4: DICTIONARY_V2[4398]
Encoding column 5: DICTIONARY_V2[4404]
Encoding column 6: DICTIONARY_V2[5553]
Encoding column 7: DICTIONARY_V2[378283]
Encoding column 8: DICTIONARY_V2[105678]
Encoding column 9: DICTIONARY_V2[60943]
Orc use both Run-length encoding and dictionary encoding to compress data. Here is the meaning of Encoding DICTIONARY_V2
. REF: ORCv1
| ENCODING | STREAM KIND | OPTIONAL | CONTENTS |
| -------- | -------------- | -------- | -------------- |
| DICTIONARY_V2| PRESENT | Yes | Boolean RLE |
| | DATA | No| Unsigned Integer RLE v2 |
| | DICTIONARY_DATA | No | String contents |
| | LENGTH | No | Unsigned Integer RLE v2 |
In dictionary encoding, if the values were [“Nevada”, “California”, “Nevada”, “California”, and “Florida”]; the DICTIONARY_DATA would be “CaliforniaFloridaNevada” and LENGTH would be [10, 7, 6]. The DATA would be [2, 0, 2, 0, 1].
And Unsigned Integer RLE v2
is alse in REF: ORCv1
In Hive 0.12, ORC introduced Run Length Encoding version 2 (RLEv2), which has improved compression and fixed bit width encodings for faster expansion. RLEv2 uses four sub-encodings based on the data:
- Short Repeat - used for short sequences with repeated values
- Direct - used for random sequences with a fixed bit width
- Patched Base - used for random sequences with a variable bit width
- Delta - used for monotonically increasing or decreasing sequences
Let's focus on the first column.
# before repartition
Stream: column 1 section DATA start: 13640 length 209662
Stream: column 1 section LENGTH start: 223302 length 1158
Stream: column 1 section DICTIONARY_DATA start: 224460 length 231328
Encoding column 1: DICTIONARY_V2[48184]
# after repartition
Stream: column 1 section DATA start: 12315 length 4260547
Stream: column 1 section LENGTH start: 4272862 length 15955
Encoding column 1: DIRECT_V2
Although I don't know how Orc selects ENCODING, ORC thinks that using DIRECT_V2
for column 1 after randomization saves more space than using DICTIONARY_V2. In fact, after repartition, the space becomes nearly 10 times larger.(4260547+15955)/(209662+1158+231328)
Most ENCODING of the other columns have not changed, but the sizes have increased.
repartition
VS coalesce
:
The file size of the former is uniform to avoid data skew.
The data size of the former becomes larger.
*(Potential)*The Row Group Index of ORC cannot be used when filtering Chaotic data.
When joining, both need to shuffle again. I use the above data to test that there is no significant difference in the time between shuffle and sort.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With