Below is the sample dataset representing the employees in_date and out_date. I have to obtain the last in_time of all employees.
Spark is running on 4 Node standalone cluster.
Initial Dataset:
EmployeeID-----in_date-----out_date
1111111 2017-04-20 2017-09-14
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2016-01-07 2016-01-20
3333333 2017-10-25 null
Dataset after df.sort(col(in_date).desc())
:
EmployeeID--in_date-----out_date
1111111 2017-11-02 null
1111111 2017-04-20 2017-09-14
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2017-10-25 null
3333333 2016-01-07 2016-01-20
df.dropDup(EmployeeID):
Output :
EmployeeID-----in_date-----out_date
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
3333333 2016-01-07 2016-01-20
Expected Dataset :
EmployeeID-----in_date-----out_date
1111111 2017-11-02 null
2222222 2017-11-28 null
3333333 2017-10-25 null
but when I sorted the Initial Dataset with sortWithInPartitions
and deduped I got the expected dataset.
Am I missing anything big or small here? Any help is appreciated.
Additional Information :
The above expected output was achieved when df.sort was executed with Spark in local mode.
I've not done any kind of partition, repartition.
The initial dataset is obtained from the underlying Cassandra database.
TL;DR Unless it is explicitly guaranteed you should never assume that operations in Spark will be executed in any particular order, especially when working with Spark SQL.
The thing you're missing here is shuffle. dropDuplicates
implementation is equivalent to:
df.groupBy(idCols).agg(first(c) for c in nonIdCols)
which will be executed as:
Intermediate shuffle introduces non-determinism and there is no guarantee that final aggregation will be applied in any particular order.
The above expected output was achieved when df.sort was executed with Spark in local mode.
local
mode is fairly simplistic. You should never use it to draw conclusions about behavior of Spark internals in a fully distributed mode.
when I sorted the Initial Dataset with sortWithInPartitions and deduped I got the expected dataset.
This would make sense if data was previously partitioned by EmployeeID
. In that case Spark wouldn't require additional shuffle.
Based on the description it looks like you should use one of the solutions shown in How to select the first row of each group?.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With