I want to be able to use a Scala function as a UDF in PySpark
package com.test
object ScalaPySparkUDFs extends Serializable {
def testFunction1(x: Int): Int = { x * 2 }
def testUDFFunction1 = udf { x: Int => testFunction1(x) }
}
I can access testFunction1
in PySpark and have it return values:
functions = sc._jvm.com.test.ScalaPySparkUDFs
functions.testFunction1(10)
What I want to be able to do is use this function as a UDF, ideally in a withColumn
call:
row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))
I think a promising approach is as found here: Spark: How to map Python with Scala or Java User Defined Functions?
However, when making the changes to code found there to use testUDFFunction1
instead:
def udf_test(col):
sc = SparkContext._active_spark_context
_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
return Column(_f(_to_seq(sc, [col], _to_java_column)))
I get:
AttributeError: 'JavaMember' object has no attribute 'apply'
I don't understand this because I believe testUDFFunction1
does have an apply method?
I do not want to use expressions of the type found here: Register UDF to SqlContext from Scala to use in PySpark
Any suggestions as to how to make this work would be appreciated!
PySpark UDF is a User Defined Function that is used to create a reusable function in Spark. Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering). The default type of the udf() is StringType. You need to handle nulls explicitly otherwise you will see side-effects.
In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.
Agree with @user6910411, you have to call apply method directly on the function. So, your code will be.
UDF in Scala:
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
object ScalaPySparkUDFs {
def testFunction1(x: Int): Int = { x * 2 }
def getFun(): UserDefinedFunction = udf(testFunction1 _ )
}
PySpark code:
def test_udf(col):
sc = spark.sparkContext
_test_udf = sc._jvm.com.test.ScalaPySparkUDFs.getFun()
return Column(_test_udf.apply(_to_seq(sc, [col], _to_java_column)))
row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", test_udf(numbers['Value']))
The question you've linked is using a Scala object
. Scala object
is a singleton and you can use apply
method directly.
Here you use a nullary function which returns an object of UserDefinedFunction
class co you have to call the function first:
_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1() # Note () at the end
Column(_f.apply(_to_seq(sc, [col], _to_java_column)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With