Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Convert PySpark DenseVector to array

Tags:

python

pyspark

I am trying to convert a pyspark dataframe column of DenseVector into array but I always got an error.

data = [(Vectors.dense([8.0, 1.0, 3.0, 2.0, 5.0]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]

df = spark.createDataFrame(data,["features"])

I tried to define a UDF and use toArray()

to_array = udf(lambda x: x.toArray(), ArrayType(FloatType()))
df = df.withColumn('features', to_array('features'))

But then, I got the following error if I do df.collect()

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 17.0 failed 4 times, 
most recent failure: Lost task 1.3 in stage 17.0 (TID 100, 10.139.64.6, executor 0): 
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict 
(for numpy.core.multiarray._reconstruct)

Any idea on how I could achieve this ?

like image 723
erwanlc Avatar asked Oct 21 '19 16:10

erwanlc


Video Answer


1 Answers

toArray() returns a numpy.ndarray which can't be converted to ArrayType(FloatType()) implicitely. Use additionally .tolist() to convert it:

import pyspark.sql.functions as F
import pyspark.sql.types as T

#or: to_array = F.udf(lambda v: list([float(x) for x in v]), T.ArrayType(T.FloatType()))
to_array = F.udf(lambda v: v.toArray().tolist(), T.ArrayType(T.FloatType()))
df = df.withColumn('features', to_array('features'))

In case you are using Pyspark >=3.0.0 you can use the new vector_to_array function:

from pyspark.ml.functions import vector_to_array
df = df.withColumn('features', vector_to_array('features'))
like image 141
cronoik Avatar answered Sep 27 '22 19:09

cronoik