Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to add multiple columns using UDF?

Question

I want to add the return values of a UDF to an existing dataframe in seperate columns. How do I achieve this in a resourceful way?

Here's an example of what I have so far.

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType  

df = spark.createDataFrame([("Alive",4)],["Name","Number"])
df.show(1)

+-----+------+
| Name|Number|
+-----+------+
|Alive|     4|
+-----+------+

def example(n):
        return [[n+2], [n-2]]

#  schema = StructType([
#          StructField("Out1", ArrayType(IntegerType()), False),
#          StructField("Out2", ArrayType(IntegerType()), False)])

example_udf = udf(example)

Now I can add a column to the dataframe as follows

newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF.show(1)
+-----+------+----------+
| Name|Number|Output    |
+-----+------+----------+
|Alive|     4|[[6], [2]]|
+-----+------+----------+

However I don't want the two values to be in the same column but rather in separate ones.

Ideally I'd like to split the output column now to avoid calling the example function two times (once for each return value) as explained here and here, however in my situation I'm getting an array of arrays and I can't see how a split would work there (please note that each array will contain multiple values, separated with a ",".

How the result should look like

What I ultimately want is this

+-----+------+----+----+
| Name|Number|Out1|Out2|
+-----+------+----+----+
|Alive|     4|   6|   2|
+-----+------+----+----+

Note that the use of the StructType return type is optional and doesn't necessarily have to be part of the solution.

EDIT: I commented out the use of StructType (and edited the udf assignment) since it's not necessary for the return type of the example function. However it has to be used if the return value would be something like

return [6,3,2],[4,3,1]
like image 375
Rob Avatar asked Dec 06 '17 08:12

Rob


People also ask

Can UDF return multiple columns?

UDF can return only a single column at the time.

How do you add multiple columns in PySpark?

Add multiple columns (withColumns) There isn't a withColumns method, so most PySpark newbies call withColumn multiple times when they need to add multiple columns to a DataFrame. The * selects all of the existing DataFrame columns and the other columns are appended.

How do I apply a function to multiple columns in PySpark?

You can use reduce , for loops, or list comprehensions to apply PySpark functions to multiple columns in a DataFrame. Using iterators to apply the same operation on multiple columns is vital for maintaining a DRY codebase.

How do I add a new column in Spark?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .


1 Answers

To return a StructType, just using Row

from pyspark.sql.types import StructType,StructField,IntegerType,Row
from pyspark.sql import functions as F

df = spark.createDataFrame([("Alive", 4)], ["Name", "Number"])


def example(n):
    return Row('Out1', 'Out2')(n + 2, n - 2)


schema = StructType([
    StructField("Out1", IntegerType(), False),
    StructField("Out2", IntegerType(), False)])

example_udf = F.UserDefinedFunction(example, schema)

newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF = newDF.select("Name", "Number", "Output.*")

newDF.show(truncate=False)
like image 100
Zhang Tong Avatar answered Oct 03 '22 06:10

Zhang Tong