According to the docs, the collect_set
and collect_list
functions should be available in Spark SQL. However, I cannot get it to work. I'm running Spark 1.6.0 using a Docker image.
I'm trying to do this in Scala:
import org.apache.spark.sql.functions._
df.groupBy("column1")
.agg(collect_set("column2"))
.show()
And receive the following error at runtime:
Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function collect_set;
Also tried it using pyspark
, but it also fails. The docs state these functions are aliases of Hive UDAFs, but I can't figure out to enable these functions.
How to fix this? Thanx!
The Spark function collect_list() is used to aggregate the values into an ArrayType typically after group by and window partition.
In Apache Hive the COLLECT_SET is an aggregate function that allows you to collect unique values from multiple rows into array.
Does it mean collect_list also maintains the order? In your code, you sort the entire dataset before collect_list() so yes.
The collect_set() function returns all values from the present input column with the duplicate values eliminated. The collect_list() function returns all the current input column values with the duplicates.
Spark 2.0+:
SPARK-10605 introduced native collect_list
and collect_set
implementation. SparkSession
with Hive support or HiveContext
are no longer required.
Spark 2.0-SNAPSHOT (before 2016-05-03):
You have to enable Hive support for a given SparkSession
:
In Scala:
val spark = SparkSession.builder
.master("local")
.appName("testing")
.enableHiveSupport() // <- enable Hive support.
.getOrCreate()
In Python:
spark = (SparkSession.builder
.enableHiveSupport()
.getOrCreate())
Spark < 2.0:
To be able to use Hive UDFs (see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF) you have use Spark built with Hive support (this is already covered when you use pre-built binaries what seems to be the case here) and initialize SparkContext
using HiveContext
.
In Scala:
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext
val sqlContext: SQLContext = new HiveContext(sc)
In Python:
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With