Let's say for instance that my team has choosen Python as the reference language to develop with Spark. But later for performance reasons, we would like to develop specific Scala or Java specific librairies in order to map them with our Python code (something similar to Python stubs with Scala or Java skeletons).
Don't you think is it possible to interface new customized Python methods with under the hood some Scala or Java User Defined Functions ?
UDFs are very useful for extending spark vocabulary but come with significant performance overhead. These are black boxes for Spark optimizer, blocking several helpful optimizations like WholeStageCodegen, Null optimization etc.
In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.
Scala is faster than Python due to its static type language. If faster performance is a requirement, Scala is a good bet. Spark is native in Scala, hence making writing Spark jobs in Scala the native way.
Spark 2.1+
You can use SQLContext.registerJavaFunction
:
Register a java UDF so it can be used in SQL statements.
which requires a name
, fully qualified name of Java class, and optional return type. Unfortunately for now it can be used only in SQL statements (or with expr
/ selectExpr
) and requires a Java org.apache.spark.sql.api.java.UDF*
:
scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.1.0" )
package com.example.spark.udfs import org.apache.spark.sql.api.java.UDF1 class addOne extends UDF1[Integer, Integer] { def call(x: Integer) = x + 1 }
sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne") sqlContext.sql("SELECT add_one(1)").show() ## +------+ ## |UDF(1)| ## +------+ ## | 2| ## +------+
Version indpendent:
I wouldn't go so far as to say it is supported but it is certainly possible. All SQL functions available currently in PySpark are simply a wrappers around Scala API.
Lets assume I want to reuse GroupConcat
UDAF I've created as an answer to SPARK SQL replacement for mysql GROUP_CONCAT aggregate function and it is located in a package com.example.udaf
:
from pyspark.sql.column import Column, _to_java_column, _to_seq from pyspark.sql import Row row = Row("k", "v") df = sc.parallelize([ row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF() def groupConcat(col): """Group and concatenate values for a given column >>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v")) >>> df.select(groupConcat("v").alias("vs")) [Row(vs=u'foo,bar')] """ sc = SparkContext._active_spark_context # It is possible to use java_import to avoid full package path _groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply # Converting to Seq to match apply(exprs: Column*) return Column(_groupConcat(_to_seq(sc, [col], _to_java_column))) df.groupBy("k").agg(groupConcat("v").alias("vs")).show() ## +---+---------+ ## | k| vs| ## +---+---------+ ## | 1|foo1,foo2| ## | 2|bar1,bar2| ## +---+---------+
There is far too much leading underscores for my taste but as you can see it can be done.
Related to:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With