Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to define and use a User-Defined Aggregate Function in Spark SQL?

I know how to write a UDF in Spark SQL:

def belowThreshold(power: Int): Boolean = {
        return power < -40
      }

sqlContext.udf.register("belowThreshold", belowThreshold _)

Can I do something similar to define an aggregate function? How is this done?

For context, I want to run the following SQL query:

val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
                                    FROM ifDF
                                    WHERE opticalReceivePower IS NOT null
                                    GROUP BY span, timestamp
                                    ORDER BY span""")

It should return something like

Row(span1, false, T0)

I want the aggregate function to tell me if there's any values for opticalReceivePower in the groups defined by span and timestamp which are below the threshold. Do I need to write my UDAF differently to the UDF I pasted above?

like image 233
Rory Byrne Avatar asked Aug 19 '15 16:08

Rory Byrne


People also ask

How user defined function works in Spark?

In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.

What is user defined aggregation function?

User-Defined Aggregate Functions (UDAFs) are user-programmable routines that act on multiple rows at once and return a single aggregated value as a result. This documentation lists the classes that are required for creating and registering UDAFs.

What do you need to do to use the AGG function in Spark?

You need to define a key or grouping in aggregation. You can also define an aggregation function that specifies how the transformations will be performed among the columns. If you give multiple values as input, the aggregation function will generate one result for each group.

What is difference between UDF and UDAF in Spark SQL?

Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.

How do I create a user-defined aggregate function in SQL Server?

Creating a user-defined aggregate function in SQL Server involves the following steps: Define the user-defined aggregate function as a class in a Microsoft .NET Framework-supported language. For more information about how to program user-defined aggregates in the CLR, see CLR User-Defined Aggregates.

What is UDF (user defined function) in spark?

Spark SQL UDF (a.k.a User Defined Function) is the most useful feature of Spark SQL & DataFrame which extends the Spark build in capabilities. In this article, I will explain what is UDF? why do we need it and how to create and using it on DataFrame and SQL using Scala example.

What is a user-defined aggregator?

A base class for user-defined aggregations, which can be used in Dataset operations to take all of the elements of a group and reduce them to a single value. IN - The input type for the aggregation.

What is UDAF in Scala?

It also contains examples that demonstrate how to define and register UDAFs in Scala and invoke them in Spark SQL. A base class for user-defined aggregations, which can be used in Dataset operations to take all of the elements of a group and reduce them to a single value.


1 Answers

Supported methods

Spark >= 3.0

Scala UserDefinedAggregateFunction is being deprecated (SPARK-30423 Deprecate UserDefinedAggregateFunction) in favor of registered Aggregator.

Spark >= 2.3

Vectorized udf (Python only):

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

from pyspark.sql.types import *
import pandas as pd

df = sc.parallelize([
    ("a", 0), ("a", 1), ("b", 30), ("b", -50)
]).toDF(["group", "power"])

def below_threshold(threshold, group="group", power="power"):
    @pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
    def below_threshold_(df):
        df = pd.DataFrame(
           df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
        df.reset_index(inplace=True, drop=False)
        return df

    return below_threshold_

Example usage:

df.groupBy("group").apply(below_threshold(-40)).show()

## +-----+---------------+
## |group|below_threshold|
## +-----+---------------+
## |    b|           true|
## |    a|          false|
## +-----+---------------+

See also Applying UDFs on GroupedData in PySpark (with functioning python example)

Spark >= 2.0 (optionally 1.6 but with slightly different API):

It is possible to use Aggregators on typed Datasets:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

class BelowThreshold[I](f: I => Boolean)  extends Aggregator[I, Boolean, Boolean]
    with Serializable {
  def zero = false
  def reduce(acc: Boolean, x: I) = acc | f(x)
  def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
  def finish(acc: Boolean) = acc

  def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
  def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}

val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)

Spark >= 1.5:

In Spark 1.5 you can create UDAF like this although it is most likely an overkill:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object belowThreshold extends UserDefinedAggregateFunction {
    // Schema you get as an input
    def inputSchema = new StructType().add("power", IntegerType)
    // Schema of the row which is used for aggregation
    def bufferSchema = new StructType().add("ind", BooleanType)
    // Returned type
    def dataType = BooleanType
    // Self-explaining 
    def deterministic = true
    // zero value
    def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
    // Similar to seqOp in aggregate
    def update(buffer: MutableAggregationBuffer, input: Row) = {
        if (!input.isNullAt(0))
          buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
    }
    // Similar to combOp in aggregate
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))    
    }
    // Called on exit to get return value
    def evaluate(buffer: Row) = buffer.getBoolean(0)
}

Example usage:

df
  .groupBy($"group")
  .agg(belowThreshold($"power").alias("belowThreshold"))
  .show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// |    a|         false|
// |    b|          true|
// +-----+--------------+

Spark 1.4 workaround:

I am not sure if I correctly understand your requirements but as far as I can tell plain old aggregation should be enough here:

val df = sc.parallelize(Seq(
    ("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")

df
  .withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
  .groupBy($"group")
  .agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
  .show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// |    a|         false|
// |    b|          true|
// +-----+--------------+

Spark <= 1.4:

As far I know, at this moment (Spark 1.4.1), there is no support for UDAF, other than the Hive ones. It should be possible with Spark 1.5 (see SPARK-3947).

Unsupported / internal methods

Internally Spark uses a number of classes including ImperativeAggregates and DeclarativeAggregates.

There are intended for internal usage and may change without further notice, so it is probably not something you want to use in your production code, but just for completeness BelowThreshold with DeclarativeAggregate could be implemented like this (tested with Spark 2.2-SNAPSHOT):

import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

case class BelowThreshold(child: Expression, threshold: Expression) 
    extends  DeclarativeAggregate  {
  override def children: Seq[Expression] = Seq(child, threshold)

  override def nullable: Boolean = false
  override def dataType: DataType = BooleanType

  private lazy val belowThreshold = AttributeReference(
    "belowThreshold", BooleanType, nullable = false
  )()

  // Used to derive schema
  override lazy val aggBufferAttributes = belowThreshold :: Nil

  override lazy val initialValues = Seq(
    Literal(false)
  )

  override lazy val updateExpressions = Seq(Or(
    belowThreshold,
    If(IsNull(child), Literal(false), LessThan(child, threshold))
  ))

  override lazy val mergeExpressions = Seq(
    Or(belowThreshold.left, belowThreshold.right)
  )

  override lazy val evaluateExpression = belowThreshold
  override def defaultResult: Option[Literal] = Option(Literal(false))
} 

It should be further wrapped with an equivalent of withAggregateFunction.

like image 62
zero323 Avatar answered Oct 07 '22 20:10

zero323