Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark -- Assign the result of UDF to multiple dataframe columns

I'm using pyspark, loading a large csv file into a dataframe with spark-csv, and as a pre-processing step I need to apply a variety of operations to the data available in one of the columns (that contains a json string). That will return X values, each of which needs to be stored in their own separate column.

That functionality will be implemented in a UDF. However, I am not sure how to return a list of values from that UDF and feed these into individual columns. Below is a simple example:

(...) from pyspark.sql.functions import udf def udf_test(n):     return [n/2, n%2]  test_udf=udf(udf_test)   df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4) 

That produces the following:

+------+----------+--------------------+ |amount|trans_date|                test| +------+----------+--------------------+ |  28.0|2016-02-07|         [14.0, 0.0]| | 31.01|2016-02-07|[15.5050001144409...| | 13.41|2016-02-04|[6.70499992370605...| | 307.7|2015-02-17|[153.850006103515...| | 22.09|2016-02-05|[11.0450000762939...| +------+----------+--------------------+ only showing top 5 rows 

What would be the best way to store the two (in this example) values being returned by the udf on separate columns? Right now they are being typed as strings:

df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()  root  |-- amount: float (nullable = true)  |-- trans_date: string (nullable = true)  |-- test: string (nullable = true) 
like image 803
Everaldo Aguiar Avatar asked Feb 10 '16 18:02

Everaldo Aguiar


People also ask

Can UDF return multiple columns?

Struct method I have returned Tuple2 for testing purpose (higher order tuples can be used according to how many multiple columns are required) from udf function and it would be treated as struct column. Then you can use . * to select all the elements in separate columns and finally rename them.

Why UDF are not recommended in Spark?

It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .

How do I apply UDF to a column Spark?

A UDF can take many parameters i.e. many columns but it should return one result i.e. one column. In order to doing so, just add parameters to your stringToBinary function and it's done. if it takes in an array def stringToBinary(stringValues: Array[String]) what will stringValues[0] represent?


2 Answers

It is not possible to create multiple top level columns from a single UDF call but you can create a new struct. It requires an UDF with specified returnType:

from pyspark.sql.functions import udf from pyspark.sql.types import StructType, StructField, FloatType  schema = StructType([     StructField("foo", FloatType(), False),     StructField("bar", FloatType(), False) ])  def udf_test(n):     return (n / 2, n % 2) if n and n != 0.0 else (float('nan'), float('nan'))  test_udf = udf(udf_test, schema) df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"])  foobars = df.select(test_udf("y").alias("foobar")) foobars.printSchema() ## root ##  |-- foobar: struct (nullable = true) ##  |    |-- foo: float (nullable = false) ##  |    |-- bar: float (nullable = false) 

You further flatten the schema with simple select:

foobars.select("foobar.foo", "foobar.bar").show() ## +---+---+ ## |foo|bar| ## +---+---+ ## |1.0|0.0| ## |1.5|1.0| ## +---+---+ 

See also Derive multiple columns from a single column in a Spark DataFrame

like image 162
zero323 Avatar answered Oct 12 '22 13:10

zero323


you can use flatMap to get the column the desired dataframe in one go

df=df.withColumn('udf_results',udf)   df4=df.select('udf_results').rdd.flatMap(lambda x:x).toDF(schema=your_new_schema) 
like image 35
Aditya Vikram Singh Avatar answered Oct 12 '22 12:10

Aditya Vikram Singh