Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a udf in PySpark which returns an array of strings?

I have a udf which returns a list of strings. this should not be too hard. I pass in the datatype when executing the udf since it returns an array of strings: ArrayType(StringType).

Now, somehow this is not working:

the dataframe i'm operating on is df_subsets_concat and looks like this:

df_subsets_concat.show(3,False)
+----------------------+
|col1                  |
+----------------------+
|oculunt               |
|predistposed          |
|incredulous           |
+----------------------+
only showing top 3 rows

and the code is

from pyspark.sql.types import ArrayType, FloatType, StringType

my_udf = lambda domain: ['s','n']
label_udf = udf(my_udf, ArrayType(StringType))
df_subsets_concat_with_md = df_subsets_concat.withColumn('subset', label_udf(df_subsets_concat.col1))

and the result is

/usr/lib/spark/python/pyspark/sql/types.py in __init__(self, elementType, containsNull)
    288         False
    289         """
--> 290         assert isinstance(elementType, DataType), "elementType should be DataType"
    291         self.elementType = elementType
    292         self.containsNull = containsNull

AssertionError: elementType should be DataType

It is my understanding that this was the correct way to do this. Here are some resources: pySpark Data Frames "assert isinstance(dataType, DataType), "dataType should be DataType" How to return a "Tuple type" in a UDF in PySpark?

But neither of these have helped me resolve why this is not working. i am using pyspark 1.6.1.

How to create a udf in pyspark which returns an array of strings?

like image 765
makansij Avatar asked Dec 06 '17 20:12

makansij


People also ask

Can Pyspark UDF return multiple columns?

The short answer is: No. Using a PySpark UDF requires Spark to serialize the Scala objects, run a Python process, deserialize the data in Python, run the function, serialize the results, and deserialize them in Scala. This causes a considerable performance penalty, so I recommend to avoid using UDFs in PySpark.

How do I pass a list to UDF Pyspark?

People say we can use pyspark. sql. functions. array() to directly pass a list to an UDF (from Spark 2.20 on wards).

How do you create an array in Pyspark?

Create PySpark ArrayType You can create an instance of an ArrayType using ArraType() class, This takes arguments valueType and one optional argument valueContainsNull to specify if a value can accept null, by default it takes True. valueType should be a PySpark type that extends DataType class.


1 Answers

You need to initialize a StringType instance:

label_udf = udf(my_udf, ArrayType(StringType()))
#                                           ^^ 
df.withColumn('subset', label_udf(df.col1)).show()
+------------+------+
|        col1|subset|
+------------+------+
|     oculunt|[s, n]|
|predistposed|[s, n]|
| incredulous|[s, n]|
+------------+------+
like image 80
Psidom Avatar answered Sep 20 '22 20:09

Psidom