Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass whole Row to UDF - Spark DataFrame filter

Tags:

apache-spark

I'm writing filter function for complex JSON dataset with lot's of inner structures. Passing individual columns is too cumbersome.

So I declared the following UDF:

val records:DataFrame = = sqlContext.jsonFile("...") def myFilterFunction(r:Row):Boolean=??? sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r)) 

Intuitively I'm thinking it will work like this:

records.filter("myFilter(*)=true") 

What is the actual syntax?

like image 855
Michael Zeltser Avatar asked Aug 04 '15 18:08

Michael Zeltser


People also ask

How do I filter rows in Spark DataFrame?

Spark filter() or where() function is used to filter the rows from DataFrame or Dataset based on the given one or multiple conditions or SQL expression. You can use where() operator instead of the filter if you are coming from SQL background. Both these functions operate exactly the same.

Why UDF are not recommended in Spark?

It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .

Can we pass DataFrame to UDF?

A UDF can only work on records that could in the most broader case be an entire DataFrame if the UDF is a user-defined aggregate function (UDAF). If you want to work on more than one DataFrame in a UDF you have to join the DataFrames to have the columns you want to use for the UDF.


1 Answers

You have to use struct() function for constructing the row while making a call to the function, follow these steps.

Import Row,

import org.apache.spark.sql._ 

Define the UDF

def myFilterFunction(r:Row) = {r.get(0)==r.get(1)}  

Register the UDF

sqlContext.udf.register("myFilterFunction", myFilterFunction _) 

Create the dataFrame

val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2") 

Use the UDF

records.filter(callUdf("myFilterFunction",struct($"text",$"text2"))).show 

When u want all columns to be passed to UDF.

records.filter(callUdf("myFilterFunction",struct(records.columns.map(records(_)) : _*))).show  

Result:

+------+------+ |  text| text2| +------+------+ |sachin|sachin| +------+------+ 
like image 193
agsachin Avatar answered Oct 08 '22 18:10

agsachin