Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Saving ordered dataframe in Spark

I'm trying to save ordered dataframe into HDFS. My code looks like this:

dataFrame.orderBy("index").write().mode(SaveMode.Overwrite).parquet(getPath());

I run same code on two different clusters, one cluster uses Spark 1.5.0, another - 1.6.0. When running on cluster with Spark 1.5.0 it does not preserve sorting after saving on disc.

Is there any specific cluster settings to preserve sorting during saving data on disc? or is it a known problem of the spark version? I've searched spark documentation but couldn't find any info about.

Update:

I've checked files in parquet and in both cases files are sorted. So problem occures while reading, Spark 1.5.0 doesn't preserve ordering while reading and 1.6.0 does.

So my question now: Is it possible to read sorted file and preserve ordering in Spark 1.5.0?

like image 694
Alexander Mann Avatar asked Dec 28 '16 14:12

Alexander Mann


People also ask

How do I save a DataFrame in Spark?

In Spark, you can save (write/extract) a DataFrame to a CSV file on disk by using dataframeObj. write. csv("path") , using this you can also write DataFrame to AWS S3, Azure Blob, HDFS, or any Spark supported file systems.

How do you maintain order in PySpark DataFrame?

You can use either sort() or orderBy() function of PySpark DataFrame to sort DataFrame by ascending or descending order based on single or multiple columns, you can also do sorting using PySpark SQL sorting functions, In this article, I will explain all these different ways using PySpark examples.

Is Spark DataFrame ordered?

DataFrame sorting using the sort() function Spark DataFrame/Dataset class provides sort() function to sort on one or more columns. By default, it sorts by ascending order.


1 Answers

There are several things going on here:

  1. When you are writing, spark splits the data into several partitions and those are written separately so even if the data is ordered it is split.

  2. When you are reading the partitions do not save ordering between them, so you would be sorted only blocks. Worse, there might be something different than a 1:1 mapping of file to partition:

    • Several files might be mapped to a single partition in the wrong order causing the sorting inside the partition to only be true in blocks
    • A single file might be divided between partitions (if it is larger than the block size).

Based on the above, the easiest solution would be to repartition (or rather coalesce) to 1 when writing and thus have 1 file. When that file is read the data would be ordered if the file is smaller than the block size (you can even make the block size very large to ensure this).

The problem with this solution is that it reduces your parallelism (when you write you need to repartition and when you read you would need to repartition again to get parallelism. The coalesce/repartition can be costly. The second problem with this solution is that it doesn't scale well (you might end up with a huge file).

A better solution would be based on your use case. The basic would be if you can use partitioning before sorting. For example, if you are planning to do a custom aggregation that requires the sorting then if you make sure to keep a 1:1 mapping between files and partitions you can be assured of the sorting within the partition which might be enough for you. You can also add the maximum value inside each partition as a second value and then groupby it and do a secondary sort.

like image 167
Assaf Mendelson Avatar answered Oct 17 '22 18:10

Assaf Mendelson