I'm trying to save ordered dataframe into HDFS. My code looks like this:
dataFrame.orderBy("index").write().mode(SaveMode.Overwrite).parquet(getPath());
I run same code on two different clusters, one cluster uses Spark 1.5.0, another - 1.6.0. When running on cluster with Spark 1.5.0 it does not preserve sorting after saving on disc.
Is there any specific cluster settings to preserve sorting during saving data on disc? or is it a known problem of the spark version? I've searched spark documentation but couldn't find any info about.
Update:
I've checked files in parquet and in both cases files are sorted. So problem occures while reading, Spark 1.5.0 doesn't preserve ordering while reading and 1.6.0 does.
So my question now: Is it possible to read sorted file and preserve ordering in Spark 1.5.0?
In Spark, you can save (write/extract) a DataFrame to a CSV file on disk by using dataframeObj. write. csv("path") , using this you can also write DataFrame to AWS S3, Azure Blob, HDFS, or any Spark supported file systems.
You can use either sort() or orderBy() function of PySpark DataFrame to sort DataFrame by ascending or descending order based on single or multiple columns, you can also do sorting using PySpark SQL sorting functions, In this article, I will explain all these different ways using PySpark examples.
DataFrame sorting using the sort() function Spark DataFrame/Dataset class provides sort() function to sort on one or more columns. By default, it sorts by ascending order.
There are several things going on here:
When you are writing, spark splits the data into several partitions and those are written separately so even if the data is ordered it is split.
When you are reading the partitions do not save ordering between them, so you would be sorted only blocks. Worse, there might be something different than a 1:1 mapping of file to partition:
Based on the above, the easiest solution would be to repartition (or rather coalesce) to 1 when writing and thus have 1 file. When that file is read the data would be ordered if the file is smaller than the block size (you can even make the block size very large to ensure this).
The problem with this solution is that it reduces your parallelism (when you write you need to repartition and when you read you would need to repartition again to get parallelism. The coalesce/repartition can be costly. The second problem with this solution is that it doesn't scale well (you might end up with a huge file).
A better solution would be based on your use case. The basic would be if you can use partitioning before sorting. For example, if you are planning to do a custom aggregation that requires the sorting then if you make sure to keep a 1:1 mapping between files and partitions you can be assured of the sorting within the partition which might be enough for you. You can also add the maximum value inside each partition as a second value and then groupby it and do a secondary sort.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With