Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating User Defined Function in Spark-SQL

I am new to spark and spark sql and i was trying to query some data using spark SQL.

I need to fetch the month from a date which is given as a string.

I think it is not possible to query month directly from sparkqsl so i was thinking of writing a user defined function in scala.

Is it possible to write udf in sparkSQL and if possible can anybody suggest the best method of writing an udf.

like image 331
user2728024 Avatar asked Jul 30 '14 07:07

user2728024


People also ask

What is user defined function in Spark?

Description. User-Defined Functions (UDFs) are user-programmable routines that act on one row. This documentation lists the classes that are required for creating and registering UDFs. It also contains examples that demonstrate how to define and register UDFs and invoke them in Spark SQL.

Can I use UDF in Spark SQL?

In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.


1 Answers

You can do this, at least for filtering, if you're willing to use a language-integrated query.

For a data file dates.txt containing:

one,2014-06-01
two,2014-07-01
three,2014-08-01
four,2014-08-15
five,2014-09-15

You can pack as much Scala date magic in your UDF as you want but I'll keep it simple:

def myDateFilter(date: String) = date contains "-08-"

Set it all up as follows -- a lot of this is from the Programming guide.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

// case class for your records
case class Entry(name: String, when: String)

// read and parse the data
val entries = sc.textFile("dates.txt").map(_.split(",")).map(e => Entry(e(0),e(1)))

You can use the UDF as part of your WHERE clause:

val augustEntries = entries.where('when)(myDateFilter).select('name, 'when)

and see the results:

augustEntries.map(r => r(0)).collect().foreach(println)

Notice the version of the where method I've used, declared as follows in the doc:

def where[T1](arg1: Symbol)(udf: (T1) ⇒ Boolean): SchemaRDD

So, the UDF can only take one argument, but you can compose several .where() calls to filter on multiple columns.

Edit for Spark 1.2.0 (and really 1.1.0 too)

While it's not really documented, Spark now supports registering a UDF so it can be queried from SQL.

The above UDF could be registered using:

sqlContext.registerFunction("myDateFilter", myDateFilter)

and if the table was registered

sqlContext.registerRDDAsTable(entries, "entries")

it could be queried using

sqlContext.sql("SELECT * FROM entries WHERE myDateFilter(when)")

For more details see this example.

like image 155
Spiro Michaylov Avatar answered Sep 22 '22 11:09

Spiro Michaylov