Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Possible to use Spark Pandas UDF in pure Spark SQL?

This works:

from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
import pandas as pd

spark = SparkSession.builder.getOrCreate()

@pandas_udf(returnType="long")
def add_one(v: pd.Series) -> pd.Series:
    return v.add(1)

spark.udf.register("add_one", add_one)

spark.sql("select add_one(1)").show()

However, I'm wondering if/how I can make the following work:

$ spark-sql -e 'select add_one(1)'
like image 683
Neil McGuigan Avatar asked Oct 18 '21 23:10

Neil McGuigan


People also ask

Why UDF are not recommended in Spark?

It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .

What is difference between UDF and UDAF in Spark SQL?

Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.

Can we use pandas in Spark?

Pandas API on Spark is useful not only for pandas users but also PySpark users, because pandas API on Spark supports many tasks that are difficult to do with PySpark, for example plotting data directly from a PySpark DataFrame.

What is UDF in Spark SQL?

Description. User-Defined Functions (UDFs) are user-programmable routines that act on one row. This documentation lists the classes that are required for creating and registering UDFs. It also contains examples that demonstrate how to define and register UDFs and invoke them in Spark SQL.

What is pandas UDF in spark?

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required.

What is UDF (user defined function) in spark?

Spark SQL UDF (a.k.a User Defined Function) is the most useful feature of Spark SQL & DataFrame which extends the Spark build in capabilities. In this article, I will explain what is UDF? why do we need it and how to create and using it on DataFrame and SQL using Scala example.

What is the difference between pandas UDF and python function?

A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general. New in version 2.3.0. user-defined function. A python function if used as a standalone function the return type of the user-defined function.

What is the best way to optimize UDFs in spark?

UDF’s are a black box to Spark hence it can’t apply optimization and you will lose all the optimization Spark does on Dataframe/Dataset. When possible you should use Spark SQL built-in functions as these functions provide optimization. import org.apache.spark.sql.functions.udf import org.apache.spark.sql.functions.col import org.apache.spark.sql.{


3 Answers

Now that would be very nice if one could use that.

I'm afraid that this is currently not possible. Funny is that nobody actually mentions it.

The information is actually "hidden" in the apache spark documentation in a small note:

Note that the Spark SQL CLI cannot talk to the Thrift JDBC server.

As you probably understand the implications that means you can't call UDFs from the CLI spark-sql. Here is the link to the documentation.

One can double check the bin/spark-sql source code at github what is actually done:

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

export _SPARK_CMD_USAGE="Usage: ./bin/spark-sql [options] [cli option]"
exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"

That again confirms, as it submits it to thriftserver, that you can't use the UDF at spark-sql CLI.

like image 175
tukan Avatar answered Oct 21 '22 11:10

tukan


Pandas UDFs are vectorized UDFs meant to avoid row by row iterations inside PySpark. Once these UDFs are registered, they behave like PySpark Function APIs. They will reside and run inside Python worker.

As @tukan mentioned, Spark SQL CLI cannot talk to JDBC server. So, Spark doesn't natively support this.

However, you can make a custom RPC call to invoke it directly but that's not as easy or same as what you want to do in the first place.

like image 28
Ashvjit Singh Avatar answered Oct 21 '22 13:10

Ashvjit Singh


It's not possible to use Python UDF in the way you want at this moment. But the option is available for Scala/Java UDF, so if you're open to using Scala/Java, this is one way to do it. Note: I'm implementing HiveUDF as Spark supports HiveUDF.

The first thing you need to do is create a Java project with the following sample structure:

root
| - pom.xml
| - src/main/com/test/udf/SimpleConcatUDF.java

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.test.udf</groupId>
  <artifactId>simple-concat-udf</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <properties>
    <hive.version>3.1.2</hive.version>
  </properties>

  <repositories>
    <repository>
      <id>hortonworks</id>
      <url>http://repo.hortonworks.com/content/groups/public</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <dependencies>
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>${hive.version}</version>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>2.3.2</version>
          <configuration>
            <source>1.6</source>
            <target>1.6</target>
          </configuration>
        </plugin>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-eclipse-plugin</artifactId>
          <version>2.9</version>
          <configuration>
            <useProjectReferences>false</useProjectReferences>
          </configuration>
        </plugin>
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <configuration>
            <appendAssemblyId>false</appendAssemblyId>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
              <manifest>
                <mainClass>com.test.udf.SimpleConcatUDF</mainClass>
              </manifest>
            </archive>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

</project>

SimpleConcatUDF.java

package com.test.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class SimpleConcatUDF extends UDF {

  public String evaluate(final Text text) {
    return text.toString() + "_from_udf";
  }

}

The next thing you'd want to do is compile and package it. I'm using maven so the standard command is:

cd <project-root-path>/
mvn clean install
# output jar file is located at <project-root-path>/target/simple-concat-udf-1.0-SNAPSHOT.jar

Finally, you'd need to register it using create function. This just needs to be done once if you register the function as permanent. Otherwise, you can register it as temporary.

spark-sql> create function simple_concat AS 'com.test.udf.SimpleConcatUDF' using jar '<project-root-path>/target/simple-concat-udf-1.0-SNAPSHOT.jar';
spark-sql> show user functions;
default.simple_concat
Time taken: 1.868 seconds, Fetched 1 row(s)
spark-sql> select simple_concat('a');
a_from_udf
Time taken: 0.079 seconds, Fetched 1 row(s)

NOTE: If you have HDFS in your system, you'd want to copy the jar file to HDFS and create function using that HDFS path instead of local path like above.

like image 41
pltc Avatar answered Oct 21 '22 12:10

pltc