Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Scala UDF in PySpark?

I want to be able to use a Scala function as a UDF in PySpark

package com.test

object ScalaPySparkUDFs extends Serializable {
    def testFunction1(x: Int): Int = { x * 2 }
    def testUDFFunction1 = udf { x: Int => testFunction1(x) }
} 

I can access testFunction1 in PySpark and have it return values:

functions = sc._jvm.com.test.ScalaPySparkUDFs 
functions.testFunction1(10)

What I want to be able to do is use this function as a UDF, ideally in a withColumn call:

row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))

I think a promising approach is as found here: Spark: How to map Python with Scala or Java User Defined Functions?

However, when making the changes to code found there to use testUDFFunction1 instead:

def udf_test(col):
    sc = SparkContext._active_spark_context
    _f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
    return Column(_f(_to_seq(sc, [col], _to_java_column)))

I get:

 AttributeError: 'JavaMember' object has no attribute 'apply' 

I don't understand this because I believe testUDFFunction1 does have an apply method?

I do not want to use expressions of the type found here: Register UDF to SqlContext from Scala to use in PySpark

Any suggestions as to how to make this work would be appreciated!

like image 638
user2682459 Avatar asked Jan 21 '17 13:01

user2682459


People also ask

How does UDF work in Pyspark?

PySpark UDF is a User Defined Function that is used to create a reusable function in Spark. Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering). The default type of the udf() is StringType. You need to handle nulls explicitly otherwise you will see side-effects.

How do I register a function as UDF in Spark Scala?

In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.


2 Answers

Agree with @user6910411, you have to call apply method directly on the function. So, your code will be.

UDF in Scala:

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._


object ScalaPySparkUDFs {

    def testFunction1(x: Int): Int = { x * 2 }

    def getFun(): UserDefinedFunction = udf(testFunction1 _ )
}

PySpark code:

def test_udf(col):
    sc = spark.sparkContext
    _test_udf = sc._jvm.com.test.ScalaPySparkUDFs.getFun()
    return Column(_test_udf.apply(_to_seq(sc, [col], _to_java_column)))


row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", test_udf(numbers['Value']))
like image 168
Ajit K'sagar Avatar answered Oct 18 '22 13:10

Ajit K'sagar


The question you've linked is using a Scala object. Scala object is a singleton and you can use apply method directly.

Here you use a nullary function which returns an object of UserDefinedFunction class co you have to call the function first:

_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1() # Note () at the end
Column(_f.apply(_to_seq(sc, [col], _to_java_column)))
like image 30
zero323 Avatar answered Oct 18 '22 15:10

zero323