This works:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
import pandas as pd
spark = SparkSession.builder.getOrCreate()
@pandas_udf(returnType="long")
def add_one(v: pd.Series) -> pd.Series:
return v.add(1)
spark.udf.register("add_one", add_one)
spark.sql("select add_one(1)").show()
However, I'm wondering if/how I can make the following work:
$ spark-sql -e 'select add_one(1)'
It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .
Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.
Pandas API on Spark is useful not only for pandas users but also PySpark users, because pandas API on Spark supports many tasks that are difficult to do with PySpark, for example plotting data directly from a PySpark DataFrame.
Description. User-Defined Functions (UDFs) are user-programmable routines that act on one row. This documentation lists the classes that are required for creating and registering UDFs. It also contains examples that demonstrate how to define and register UDFs and invoke them in Spark SQL.
Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required.
Spark SQL UDF (a.k.a User Defined Function) is the most useful feature of Spark SQL & DataFrame which extends the Spark build in capabilities. In this article, I will explain what is UDF? why do we need it and how to create and using it on DataFrame and SQL using Scala example.
A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general. New in version 2.3.0. user-defined function. A python function if used as a standalone function the return type of the user-defined function.
UDF’s are a black box to Spark hence it can’t apply optimization and you will lose all the optimization Spark does on Dataframe/Dataset. When possible you should use Spark SQL built-in functions as these functions provide optimization. import org.apache.spark.sql.functions.udf import org.apache.spark.sql.functions.col import org.apache.spark.sql.{
Now that would be very nice if one could use that.
I'm afraid that this is currently not possible. Funny is that nobody actually mentions it.
The information is actually "hidden" in the apache spark documentation in a small note:
Note that the Spark SQL CLI cannot talk to the Thrift JDBC server.
As you probably understand the implications that means you can't call UDFs
from the CLI spark-sql
. Here is the link to the documentation.
One can double check the bin/spark-sql
source code at github what is actually done:
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
export _SPARK_CMD_USAGE="Usage: ./bin/spark-sql [options] [cli option]"
exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"
That again confirms, as it submits it to thriftserver
, that you can't use the UDF
at spark-sql
CLI.
Pandas UDFs are vectorized UDFs meant to avoid row by row iterations inside PySpark. Once these UDFs are registered, they behave like PySpark Function APIs. They will reside and run inside Python worker.
As @tukan mentioned, Spark SQL CLI cannot talk to JDBC server. So, Spark doesn't natively support this.
However, you can make a custom RPC call to invoke it directly but that's not as easy or same as what you want to do in the first place.
It's not possible to use Python UDF in the way you want at this moment. But the option is available for Scala/Java UDF, so if you're open to using Scala/Java, this is one way to do it. Note: I'm implementing HiveUDF as Spark supports HiveUDF.
The first thing you need to do is create a Java project with the following sample structure:
root
| - pom.xml
| - src/main/com/test/udf/SimpleConcatUDF.java
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test.udf</groupId>
<artifactId>simple-concat-udf</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<hive.version>3.1.2</hive.version>
</properties>
<repositories>
<repository>
<id>hortonworks</id>
<url>http://repo.hortonworks.com/content/groups/public</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.9</version>
<configuration>
<useProjectReferences>false</useProjectReferences>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.test.udf.SimpleConcatUDF</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
SimpleConcatUDF.java
package com.test.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class SimpleConcatUDF extends UDF {
public String evaluate(final Text text) {
return text.toString() + "_from_udf";
}
}
The next thing you'd want to do is compile and package it. I'm using maven so the standard command is:
cd <project-root-path>/
mvn clean install
# output jar file is located at <project-root-path>/target/simple-concat-udf-1.0-SNAPSHOT.jar
Finally, you'd need to register it using create function
. This just needs to be done once if you register the function as permanent. Otherwise, you can register it as temporary.
spark-sql> create function simple_concat AS 'com.test.udf.SimpleConcatUDF' using jar '<project-root-path>/target/simple-concat-udf-1.0-SNAPSHOT.jar';
spark-sql> show user functions;
default.simple_concat
Time taken: 1.868 seconds, Fetched 1 row(s)
spark-sql> select simple_concat('a');
a_from_udf
Time taken: 0.079 seconds, Fetched 1 row(s)
NOTE: If you have HDFS in your system, you'd want to copy the jar file to HDFS and create function using that HDFS path instead of local path like above.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With