Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SparkSQL DataFrame order by across partitions

I'm using spark sql to run a query over my dataset. The result of the query is pretty small but still partitioned.

I would like to coalesce the resulting DataFrame and order the rows by a column. I tried

DataFrame result = sparkSQLContext.sql("my sql").coalesce(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

I also tried

DataFrame result = sparkSQLContext.sql("my sql").repartition(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

the output file is ordered in chunks (i.e. the partitions are ordered, but the data frame is not ordered as a whole). For example, instead of

1, value
2, value
4, value
4, value
5, value
5, value
...

I get

2, value
4, value
5, value
-----------> partition boundary
1, value
4, value
5, value
  1. What is the correct way to get an absolute ordering of my query result?
  2. Why isn't the data frame being coalesced into a single partition?
like image 868
fo_x86 Avatar asked Jul 31 '15 01:07

fo_x86


People also ask

How do I sort by multiple columns in Pyspark?

Using orderBy() to sort multiple columns Alternatively, we can also use orderBy() function of the DataFrame to sort the multiple columns. and use asc for ascending and desc for descending. Happy Learning !!

Are Spark Dataframes ordered?

DataFrame sorting using the sort() function Spark DataFrame/Dataset class provides sort() function to sort on one or more columns. By default, it sorts by ascending order.

What is the difference between group by and orderBy in Spark?

Key Differences between GROUP BY and ORDER BYThe Group By clause is used to group data based on the same value in a specific column. The ORDER BY clause, on the other hand, sorts the result and shows it in ascending or descending order. It is mandatory to use the aggregate function to use the Group By.

Which is faster repartition or coalesce?

repartition redistributes the data evenly, but at the cost of a shuffle. coalesce works much faster when you reduce the number of partitions because it sticks input partitions together. coalesce doesn't guarantee uniform data distribution. coalesce is identical to a repartition when you increase the number of ...


1 Answers

I want to mention couple of things here . 1- the source code shows that the orderBy statement internally calls the sorting api with global ordering set to true .So the lack of ordering at the level of the output suggests that the ordering was lost while writing into the target. My point is that a call to orderBy always requires global order.

2- Using a drastic coalesce , as in forcing a single partition in your case , can be really dangerous. I would recommend you do not do that. The source code suggests that calling coalesce(1) can potentially cause upstream transformations to use a single partition . This would be brutal performance wise.

3- You seem to expect the orderBy statement to be executed with a single partition. I do not think that i agree with that statement. That would make Spark a really silly distributed framework.

Community please let me know if you agree or disagree with statements.

how are you collecting data from the output anyway?

maybe the output actually contains sorted data , but the transformations /actions that you performed in order to read from the output is responsible for the order lost.

like image 144
JavaPlanet Avatar answered Oct 12 '22 11:10

JavaPlanet