Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error in Spark while declaring a UDF

I am trying to create a udf which takes a value(array) in a column and returns an array containing only unique elements . Please see the code below in Spark (version-1.6.1):

def uniq_array(col_array):
    x = np.unique(col_array)
    return x

uniq_array_udf = udf(uniq_array,ArrayType())

However, I am continuously running into the error: TypeError: __init__() takes at least 2 arguments (1 given)

Can anyone please help me resolve the error as soon as possible?

Thanks!

like image 337
Preyas Avatar asked Aug 16 '16 19:08

Preyas


People also ask

Why UDF are not recommended in Spark?

1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer. Let's consider an example of a general optimization when reading data from Database or columnar format files such as Parquet is PredicatePushdown.

How does Spark define UDF?

You define a new UDF by defining a Scala function as an input parameter of udf function. It accepts Scala functions of up to 10 input parameters. You can register UDFs to use in SQL-based query expressions via UDFRegistration (that is available through SparkSession. udf attribute).


1 Answers

For ArrayType, the type of the contents of the array also needs to be specified, eg

def uniq_array(col_array):
    x = np.unique(col_array)
    return x

uniq_array_udf = udf(uniq_array,ArrayType(IntegerType()))
like image 184
David Avatar answered Oct 09 '22 22:10

David