I'm processing a JSON file to generate two JSON files using Spark (version 1.6.1). The size of input file is about 30~40G (100M records). For the generated files, the bigger one is about 10G ~ 15G (30M records), the smaller one is about 500M ~ 750M (1.5M records). both result files are facing the below problems:
I invoked the "sort" method for the dataframe, after that performed "repartition" to merge the results into a single file. Then I checked the generated file, found in an interval the records are ordered, but the whole file is not ordered globally. e.g. the key (constructed from 3 columns) of the last record (line no 1.9M) in the file is "(ou7QDj48c, 014, 075)", but the key of a middle record in the file (line no 375K) is "(pzwzh5vm8, 003, 023)"
pzwzh5vm8 003 023
...
ou7QDj48c 014 075
When I tested code locally using a relatively small input source (input file 400K lines), such case doesn't happen at all.
My concrete code is shown below:
big_json = big_json.sort($"col1", $"col2", $"col3", $"col4")
big_json.repartition(1).write.mode("overwrite").json("filepath")
Could anyone give an advice? Thank you.
(I've also noticed that this thread discussed a similar problem, but there is not a good solution till now. If this phenomenon is really resulted from repartition operation, could anyone help me to effectively transform dataframe to a single JSON file without transform it into RDD, while keep the sorted order? Thanks)
Solution:
Really appreciate for the help from @manos @eliasah and @pkrishna. I had thought about using coalesce after read your comments but after having investigated its performance I gave up the idea.
The final solution is: sort the dataframe and write into JSON, without any repartition or coalesce. After the whole work is done, call the HDFS command below
hdfs dfs -getmerge /hdfs/file/path/part* ./local.json
This command is far better than my imagine. It neither takes too much time nor too much space, and gives me a good single file. I just used head
and tail
on the huge result file and it seems totally ordered.
What's happening is that you are repartitioning after your sort action.
repartition
reshuffles the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
Under the hood, it uses coalesce
and shuffle
to redistribute data.
This is why your data isn't sorted anymore.
You can check the code for reference.
Since the partition count is set to 1 in your example, which means the partition is reduced to 1.
To decrease the number of partitions in the rdd, spark provides a transformation coalesce (with shuffle=false) which preserves the order.
As eliasah, mentioned that repartition under the hood using coalesce. It calls coalesce with shuffle=true. So the coalesce transformation can be used instead of repartition with shuffle=false.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With