Is it possible to register a UDF (or function) written in Scala to use in PySpark ? E.g.:
val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2
In Scala, the following is now possible:
val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3
I would like to use "UDFaddOne" in PySpark like
%pyspark
mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work
Background: We are a team of developpers, some coding in Scala and some in Python, and would like to share already written functions. It would also be possible to save it into a library and import it.
As far as I know PySpark doesn't provide any equivalent of the callUDF
function and because of that it is not possible to access registered UDF directly.
The simplest solution here is to use raw SQL expression:
mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam")))
## OR
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable")
## OR
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")
This approach is rather limited so if you need to support more complex workflows you should build a package and provide complete Python wrappers. You'll find and example UDAF wrapper in my answer to Spark: How to map Python with Scala or Java User Defined Functions?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With