I have the following dataframe data
:
root
|-- userId: string
|-- product: string
|-- rating: double
and the following query:
val result = sqlContext.sql("select userId, collect_list(product), collect_list(rating) from data group by userId")
My question is that, does product
and rating
in the aggregated arrays match each other? That is, whether the product
and the rating
from the same row have the same index in the aggregated arrays.
Update:
Starting from Spark 2.0.0, one can do collect_list
on struct type so we can do one collect_list
on a combined column. But for pre 2.0.0 version, one can only use collect_list
on primitive type.
I believe there is no explicit guarantee that all arrays will have the same order. Spark SQL uses multiple optimizations and under certain conditions there is no guarantee that all aggregations are scheduled at the same time (one example is aggregation with DISTINCT
). Since exchange (shuffle) results in nondeterministic order it is theoretically possible that order will differ.
So while it should work in practice it could be risky and introduce some hard to detect bugs.
If you Spark 2.0.0 or later you can aggregate non-atomic columns with collect_list
:
SELECT userId, collect_list(struct(product, rating)) FROM data GROUP BY userId
If you use an earlier version you can try to use explicit partitions and order:
WITH tmp AS (
SELECT * FROM data DISTRIBUTE BY userId SORT BY userId, product, rating
)
SELECT userId, collect_list(product), collect_list(rating)
FROM tmp
GROUP BY userId
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With