Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark - sortWithInPartitions over sort

Below is the sample dataset representing the employees in_date and out_date. I have to obtain the last in_time of all employees.

Spark is running on 4 Node standalone cluster.

Initial Dataset:

EmployeeID-----in_date-----out_date

1111111     2017-04-20  2017-09-14 
1111111     2017-11-02  null 
2222222     2017-09-26  2017-09-26 
2222222     2017-11-28  null 
3333333     2016-01-07  2016-01-20 
3333333     2017-10-25  null 

Dataset after df.sort(col(in_date).desc()):

EmployeeID--in_date-----out_date

1111111   2017-11-02   null 
1111111   2017-04-20   2017-09-14 
2222222   2017-09-26   2017-09-26 
2222222   2017-11-28   null 
3333333   2017-10-25   null 
3333333   2016-01-07   2016-01-20 
df.dropDup(EmployeeID):  

Output :

EmployeeID-----in_date-----out_date

1111111    2017-11-02    null 
2222222    2017-09-26    2017-09-26 
3333333    2016-01-07    2016-01-20 

Expected Dataset :

EmployeeID-----in_date-----out_date

1111111    2017-11-02   null 
2222222    2017-11-28   null 
3333333    2017-10-25   null 

but when I sorted the Initial Dataset with sortWithInPartitions and deduped I got the expected dataset. Am I missing anything big or small here? Any help is appreciated.

Additional Information : The above expected output was achieved when df.sort was executed with Spark in local mode.
I've not done any kind of partition, repartition. The initial dataset is obtained from the underlying Cassandra database.

like image 400
ThinkTank0790 Avatar asked Mar 08 '23 11:03

ThinkTank0790


1 Answers

TL;DR Unless it is explicitly guaranteed you should never assume that operations in Spark will be executed in any particular order, especially when working with Spark SQL.

The thing you're missing here is shuffle. dropDuplicates implementation is equivalent to:

df.groupBy(idCols).agg(first(c) for c in nonIdCols)

which will be executed as:

  • Partial ("map-side") aggregation.
  • Shuffle.
  • Final ("reduce-side") aggregation.

Intermediate shuffle introduces non-determinism and there is no guarantee that final aggregation will be applied in any particular order.

The above expected output was achieved when df.sort was executed with Spark in local mode.

local mode is fairly simplistic. You should never use it to draw conclusions about behavior of Spark internals in a fully distributed mode.

when I sorted the Initial Dataset with sortWithInPartitions and deduped I got the expected dataset.

This would make sense if data was previously partitioned by EmployeeID. In that case Spark wouldn't require additional shuffle.

Based on the description it looks like you should use one of the solutions shown in How to select the first row of each group?.

like image 66
zero323 Avatar answered May 12 '23 19:05

zero323