Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating a SparkSQL UDF in Java outside of SQLContext

Problem

I would like to create a User-Defined Function in Java that can be called as a Java method within a chain of Apache Spark operators. I'm having trouble finding Java examples that don't require the UDF to exist inside of a SQL query.

Versions

  • Java 8
  • Scala 2.10.6
  • Apache Spark 1.6.0 Pre-built for Hadoop 2.6.0

What I've Tried That Works

I can successfully create a UDF in Java. However, I can't use this unless it's in a SQL query:

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
oldDF.registerTempTable("df");
DataFrame newDF = sqlContext.sql("SELECT udfUppercase(name) AS name_upper FROM df");

Where I'm Stuck

I would expect a non-SQL method-call-style UDF in Java to look something like this:

import static org.apache.spark.sql.functions.udf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

UserDefinedFunction udfUppercase = udf(
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", udfUppercase(oldDF.col("name")));

Compiling this leads to a compiler error on the line beginning with "UserDefinedFunction", so obviously my attempt at guessing the right signature is incorrect:

error: no suitable method found for udf((String st[...]ase(),DataType)
    UserDefinedFunction udfUppercase = udf((String string) -> string.toUpperCase(), DataTypes.StringType);
method functions.<RT#1>udf(Function0<RT#1>,TypeTags.TypeTag<RT#1>) is not applicable
    (cannot infer type-variable(s) RT#1
    (argument mismatch; Function0 is not a functional interface
    multiple non-overriding abstract methods found in interface Function0))

This error continues with detail for each of the inferred udf() signatures attempted.

What I Need

I need to fix the Java code so that I can define and use the udfUppercase UDF without embedding it in a SQL query. I feel like I'm missing something very simple, fundamental, and possibly syntax-y, but could be completely off base.

Working Solution (courtesy of zero323 below)

There's no good way to register and use a Java UDF as a Java method, but a UDF registered in the SQLContext can be inserted into a chain of operators using callUDF().

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", callUDF("udfUppercase", oldDF.col("name")));

Also, be sure to use callUDF() and not the deprecated callUdf() which has a different method signature.

like image 971
sparkour Avatar asked Mar 27 '16 14:03

sparkour


People also ask

Why we should not use UDF in Spark?

It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .

Is SparkSQL the same as SQL?

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine. It enables unmodified Hadoop Hive queries to run up to 100x faster on existing deployments and data.

Can we use Hive UDF in Spark?

Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.

Is Spark UDF faster?

In these circumstances, PySpark UDF is around 10 times more performant than the PySpark Pandas UDF. We have also found that creating a Python wrapper to call Scala UDF from PySpark code is around 15 times more performant than the two types of PySpark UDFs.


1 Answers

Spark >= 2.3

SPARK-22945 (add java UDF APIs in the functions object) adds simplified udf API, similar to Scala and Python:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction udfUppercase = udf(
  (String s) -> s.toUpperCase(), DataTypes.StringType
);

df.select(udfUppercase.apply(col("name")));

Spark < 2.3

Long story short functions.udf methods are not designed for Java interoperability. All variants require TypeTags and while it is possible to generate these manually (I am pretty sure I've seen Daniel Darabos showing how to do it on SO) it is something you probably want to avoid.

If for some reason you want to avoid writing UDF in Scala the simplest thing is to register UDF and call it by name:

sqlContext.udf().register("udfUppercase",
  (String string) -> string.toUpperCase(), DataTypes.StringType);

df.select(callUDF("udfUppercase", col("name")));
like image 153
zero323 Avatar answered Oct 13 '22 01:10

zero323