Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use more than one collect_list in one query in Spark SQL

I have the following dataframe data:

root
 |-- userId: string 
 |-- product: string 
 |-- rating: double

and the following query:

val result = sqlContext.sql("select userId, collect_list(product), collect_list(rating) from data group by userId")

My question is that, does product and rating in the aggregated arrays match each other? That is, whether the product and the rating from the same row have the same index in the aggregated arrays.

Update: Starting from Spark 2.0.0, one can do collect_list on struct type so we can do one collect_list on a combined column. But for pre 2.0.0 version, one can only use collect_list on primitive type.

like image 683
Rainfield Avatar asked Nov 03 '16 17:11

Rainfield


1 Answers

I believe there is no explicit guarantee that all arrays will have the same order. Spark SQL uses multiple optimizations and under certain conditions there is no guarantee that all aggregations are scheduled at the same time (one example is aggregation with DISTINCT). Since exchange (shuffle) results in nondeterministic order it is theoretically possible that order will differ.

So while it should work in practice it could be risky and introduce some hard to detect bugs.

If you Spark 2.0.0 or later you can aggregate non-atomic columns with collect_list:

SELECT userId, collect_list(struct(product, rating)) FROM data GROUP BY userId

If you use an earlier version you can try to use explicit partitions and order:

WITH tmp AS (
  SELECT * FROM data DISTRIBUTE BY userId SORT BY userId, product, rating
)
SELECT userId, collect_list(product), collect_list(rating)
FROM tmp
GROUP BY userId
like image 163
zero323 Avatar answered Oct 04 '22 18:10

zero323