Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Register UDF to SqlContext from Scala to use in PySpark

Is it possible to register a UDF (or function) written in Scala to use in PySpark ? E.g.:

val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2

In Scala, the following is now possible:

val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3

I would like to use "UDFaddOne" in PySpark like

%pyspark

mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work

Background: We are a team of developpers, some coding in Scala and some in Python, and would like to share already written functions. It would also be possible to save it into a library and import it.

like image 856
Andarin Avatar asked Apr 07 '16 14:04

Andarin


1 Answers

As far as I know PySpark doesn't provide any equivalent of the callUDF function and because of that it is not possible to access registered UDF directly.

The simplest solution here is to use raw SQL expression:

mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam")))

## OR
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable")

## OR
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")

This approach is rather limited so if you need to support more complex workflows you should build a package and provide complete Python wrappers. You'll find and example UDAF wrapper in my answer to Spark: How to map Python with Scala or Java User Defined Functions?

like image 119
zero323 Avatar answered Sep 28 '22 10:09

zero323