Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Error:expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)

I have a dataframe in Spark in which one of the columns contains an array.Now,I have written a separate UDF which converts the array to another array with distinct values in it only. See example below:

Ex: [24,23,27,23] should get converted to [24, 23, 27] Code:

def uniq_array(col_array):     x = np.unique(col_array)     return x uniq_array_udf = udf(uniq_array,ArrayType(IntegerType()))  Df3 = Df2.withColumn("age_array_unique",uniq_array_udf(Df2.age_array)) 

In the above code, Df2.age_array is the array on which I am applying the UDF to get a different column "age_array_unique" which should contain only unique values in the array.

However, as soon as I run the command Df3.show(), I get the error:

net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)

Can anyone please let me know why this is happening?

Thanks!

like image 417
Preyas Avatar asked Aug 16 '16 21:08

Preyas


People also ask

What are the py4jjavaerror exceptions in spark?

Most of the Py4JJavaError exceptions I’ve seen came from mismatched data types between Python and Spark, especially when the function uses a data type from a python module like numpy. so I’d first look into that if there’s an error. For example, if the output is a numpy.ndarray, then the UDF throws an exception.

Why is spark UDF not distributing the python function as desired?

In other words, Spark doesn’t distributing the Python function as desired if the dataframe is too small. To fix this, I repartitioned the dataframe before calling the UDF.

How to return a tuple of mixed typed values in spark?

For a function that returns a tuple of mixed typed values, I can make a corresponding StructType (), which is a composite type in Spark, and specify what is in the struct with StructField (). For example, if I have a function that returns the position and the letter from ascii_letters,

Why is my spark dataframe so slow?

One reason of slowness I ran into was because my data was too small in terms of file size — when the dataframe is small enough, Spark sends the entire dataframe to one and only one executor and leave other executors waiting. In other words, Spark doesn’t distributing the Python function as desired if the dataframe is too small.


2 Answers

The source of the problem is that object returned from the UDF doesn't conform to the declared type. np.unique not only returns numpy.ndarray but also converts numerics to the corresponding NumPy types which are not compatible with DataFrame API. You can try something like this:

udf(lambda x: list(set(x)), ArrayType(IntegerType())) 

or this (to keep order)

udf(lambda xs: list(OrderedDict((x, None) for x in xs)),      ArrayType(IntegerType())) 

instead.

If you really want np.unique you have to convert the output:

udf(lambda x: np.unique(x).tolist(), ArrayType(IntegerType())) 
like image 59
zero323 Avatar answered Sep 25 '22 12:09

zero323


You need to convert the final value to a python list. You implement the function as follows:

def uniq_array(col_array):     x = np.unique(col_array)     return list(x) 

This is because Spark doesn't understand the numpy array format. In order to feed a python object that Spark DataFrames understand as an ArrayType, you need to convert the output to a python list before returning it.

like image 34
user1632287 Avatar answered Sep 25 '22 12:09

user1632287