Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Registering Hive Custom UDF with Spark (Spark SQL) 2.0.0

I am working on a spark 2.0.0 piece where my requirement is to use 'com.facebook.hive.udf.UDFNumberRows' function in my sql context to use in one of the queries. In my cluster with Hive query, I use this as a temporary function just by defining : CREATE TEMPORARY FUNCTION myFunc AS 'com.facebook.hive.udf.UDFNumberRows', which is quite simple.

I tried registering this with sparkSession as below but got an error:

sparkSession.sql("""CREATE TEMPORARY FUNCTION myFunc AS 'com.facebook.hive.udf.UDFNumberRows'""")

Error :

CREATE TEMPORARY FUNCTION rowsequence AS 'com.facebook.hive.udf.UDFNumberRows'
16/11/01 20:46:17 ERROR ApplicationMaster: User class threw exception: java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead.
java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead.
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeFunctionBuilder(SessionCatalog.scala:751)
    at org.apache.spark.sql.execution.command.CreateFunctionCommand.run(functions.scala:61)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:186)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$.delayedEndpoint$com$mediamath$spark$attribution$sparkjob$SparkVideoCidJoin$1(SparkVideoCidJoin.scala:75)
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$delayedInit$body.apply(SparkVideoCidJoin.scala:22)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$.main(SparkVideoCidJoin.scala:22)
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin.main(SparkVideoCidJoin.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)

Does anybody have idea how to register it as spark is asking, i.e. with register api in sparkSession and SQLContext:

 sqlContext.udf.register(...)
like image 692
Apratim Tiwari Avatar asked Nov 01 '16 21:11

Apratim Tiwari


People also ask

Can we use Hive UDF in spark?

Description. Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs.


2 Answers

In Spark 2.0,

sparkSession.udf.register(...) 

allows you to register Java or Scala UDFs (functions of type Long => Long), but not Hive GenericUDFs that handle LongWritable instead of Long, and that can have a variable number of arguments.

To register Hive UDFs, your first approach was correct:

sparkSession.sql("""CREATE TEMPORARY FUNCTION myFunc AS 'com.facebook.hive.udf.UDFNumberRows'""")

However you must enable Hive support first:

SparkSession.builder().enableHiveSupport()

and make sure that the "spark-hive" dependencies are present in your classpath.

Explanation:

Your error message

java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead

comes from the class SessionCatalog.

By calling SparkSession.builder().enableHiveSupport(), spark will replace the SessionCatalog with a HiveSessionCatalog in which the method makeFunctionBuilder is implemented.

Lastly:

the UDF you want to use, 'com.facebook.hive.udf.UDFNumberRows', has been written in a time where Windowing Functions were not available in Hive. I suggest you to use them instead. You can check the Hive Reference, this Spark-SQL intro, or this if you want to stick to the scala syntax.

like image 60
FurryMachine Avatar answered Oct 14 '22 01:10

FurryMachine


The problem that you are facing it's that Spark is not loading the jar library in his classPath.

In our team we are loading external libraries with --jars option.

/usr/bin/spark-submit  --jars external_library.jar our_program.py --our_params 

You can check if you are loading external libraries in Spark History - Environment Tab. (spark.yarn.secondary.jars)

Then you will be able to register your udf as you said. Once you enable HiveSupport as FurryMachine says.

sparkSession.sql("""
    CREATE TEMPORARY FUNCTION myFunc AS  
    'com.facebook.hive.udf.UDFNumberRows'
""")

You can found more info in spark-summit --help

hadoop:~/projects/neocortex/src$ spark-submit --help
Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]   
Usage: spark-submit run-example [options] example-class [example args]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
  --name NAME                 A name of your application.
  --jars JARS                 Comma-separated list of local jars to include on the driver
                              and executor classpaths.
like image 34
Franzi Avatar answered Oct 14 '22 00:10

Franzi