Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: How to map Python with Scala or Java User Defined Functions?

Tags:

Let's say for instance that my team has choosen Python as the reference language to develop with Spark. But later for performance reasons, we would like to develop specific Scala or Java specific librairies in order to map them with our Python code (something similar to Python stubs with Scala or Java skeletons).

Don't you think is it possible to interface new customized Python methods with under the hood some Scala or Java User Defined Functions ?

like image 451
prossblad Avatar asked Oct 20 '15 10:10

prossblad


People also ask

Is it good to use UDF in Spark?

UDFs are very useful for extending spark vocabulary but come with significant performance overhead. These are black boxes for Spark optimizer, blocking several helpful optimizations like WholeStageCodegen, Null optimization etc.

How do I register a function as UDF in Spark Scala?

In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.

Should I use Scala or Python for Spark?

Scala is faster than Python due to its static type language. If faster performance is a requirement, Scala is a good bet. Spark is native in Scala, hence making writing Spark jobs in Scala the native way.


1 Answers

Spark 2.1+

You can use SQLContext.registerJavaFunction:

Register a java UDF so it can be used in SQL statements.

which requires a name, fully qualified name of Java class, and optional return type. Unfortunately for now it can be used only in SQL statements (or with expr / selectExpr) and requires a Java org.apache.spark.sql.api.java.UDF*:

scalaVersion := "2.11.8"  libraryDependencies ++= Seq(   "org.apache.spark" %% "spark-sql" % "2.1.0" ) 
package com.example.spark.udfs  import org.apache.spark.sql.api.java.UDF1  class addOne extends UDF1[Integer, Integer] {   def call(x: Integer) = x + 1 }  
sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne") sqlContext.sql("SELECT add_one(1)").show()  ## +------+ ## |UDF(1)| ## +------+ ## |     2| ## +------+ 

Version indpendent:

I wouldn't go so far as to say it is supported but it is certainly possible. All SQL functions available currently in PySpark are simply a wrappers around Scala API.

Lets assume I want to reuse GroupConcat UDAF I've created as an answer to SPARK SQL replacement for mysql GROUP_CONCAT aggregate function and it is located in a package com.example.udaf:

from pyspark.sql.column import Column, _to_java_column, _to_seq from pyspark.sql import Row  row = Row("k", "v") df = sc.parallelize([     row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF()  def groupConcat(col):     """Group and concatenate values for a given column      >>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))     >>> df.select(groupConcat("v").alias("vs"))     [Row(vs=u'foo,bar')]     """     sc = SparkContext._active_spark_context     # It is possible to use java_import to avoid full package path     _groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply     # Converting to Seq to match apply(exprs: Column*)     return Column(_groupConcat(_to_seq(sc, [col], _to_java_column)))  df.groupBy("k").agg(groupConcat("v").alias("vs")).show()  ## +---+---------+ ## |  k|       vs| ## +---+---------+ ## |  1|foo1,foo2| ## |  2|bar1,bar2| ## +---+---------+ 

There is far too much leading underscores for my taste but as you can see it can be done.

Related to:

  • Calling Java/Scala function from a task
  • How to use a Scala class inside Pyspark
  • Transforming PySpark RDD with Scala
like image 137
zero323 Avatar answered Sep 22 '22 04:09

zero323