Use collect_list and collect_set in Spark SQL

According to the docs, the collect_set and collect_list functions should be available in Spark SQL. However, I cannot get it to work. I'm running Spark 1.6.0 using a Docker image.

I'm trying to do this in Scala:

import org.apache.spark.sql.functions._ 


And receive the following error at runtime:

Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function collect_set; 

Also tried it using pyspark, but it also fails. The docs state these functions are aliases of Hive UDAFs, but I can't figure out to enable these functions.

How to fix this? Thanx!

What does collect_list do in Spark?

The Spark function collect_list() is used to aggregate the values into an ArrayType typically after group by and window partition.

What is Collect_set?

In Apache Hive the COLLECT_SET is an aggregate function that allows you to collect unique values from multiple rows into array.

Does collect_list maintain order?

Does it mean collect_list also maintains the order? In your code, you sort the entire dataset before collect_list() so yes.

What does collect set do in PySpark?

The collect_set() function returns all values from the present input column with the duplicate values eliminated. The collect_list() function returns all the current input column values with the duplicates.

Spark 2.0+:

SPARK-10605 introduced native collect_list and collect_set implementation. SparkSession with Hive support or HiveContext are no longer required.

Spark 2.0-SNAPSHOT (before 2016-05-03):

You have to enable Hive support for a given SparkSession:

In Scala:

val spark = SparkSession.builder
  .enableHiveSupport()  // <- enable Hive support.

In Python:

spark = (SparkSession.builder

Spark < 2.0:

To be able to use Hive UDFs (see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF) you have use Spark built with Hive support (this is already covered when you use pre-built binaries what seems to be the case here) and initialize SparkContext using HiveContext.

In Scala:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext

val sqlContext: SQLContext = new HiveContext(sc) 

In Python:

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)
