Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Defining a UDF that accepts an Array of objects in a Spark DataFrame?

When working with Spark's DataFrames, User Defined Functions (UDFs) are required for mapping data in columns. UDFs require that argument types are explicitly specified. In my case, I need to manipulate a column that is made up of arrays of objects, and I do not know what type to use. Here's an example:

import sqlContext.implicits._

// Start with some data. Each row (here, there's only one row) 
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
  """
  |{
  |  "topic" : "pets",
  |  "subjects" : [
  |    {"type" : "cat", "score" : 10},
  |    {"type" : "dog", "score" : 1}
  |  ]
  |}
  """)))

It's relatively straightforward to use the built-in org.apache.spark.sql.functions to perform basic operations on the data in the columns

import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show

+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets|             2|
+-----+--------------+

and it's generally easy to write custom UDFs to perform arbitrary operations

import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show 

+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
|      PETS|             2|
+----------+--------------+

But what if I want to use a UDF to manipulate the array of objects in the "subjects" column? What type do I use for the argument in the UDF? For example, if I want to reimplement the size function, instead of using the one provided by spark:

val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show

Clearly Array[Something] does not work... what type should I use!? Should I ditch Array[] altogether? Poking around tells me scala.collection.mutable.WrappedArray may have something to do with it, but still there's another type I need to provide.

like image 379
mattsilver Avatar asked Aug 17 '16 21:08

mattsilver


People also ask

Why UDF are not recommended in Spark?

It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .

Can a UDF return multiple columns?

1 Answer. Creating multiple top level columns from a single UDF call, isn't possible but you can create a new struct.

Can PySpark UDF return multiple columns?

The short answer is: No. Using a PySpark UDF requires Spark to serialize the Scala objects, run a Python process, deserialize the data in Python, run the function, serialize the results, and deserialize them in Scala. This causes a considerable performance penalty, so I recommend to avoid using UDFs in PySpark.

Can we pass DataFrame to UDF?

A UDF can only work on records that could in the most broader case be an entire DataFrame if the UDF is a user-defined aggregate function (UDAF). If you want to work on more than one DataFrame in a UDF you have to join the DataFrames to have the columns you want to use for the UDF.


1 Answers

What you're looking for is Seq[o.a.s.sql.Row]:

import org.apache.spark.sql.Row

val my_size = udf { subjects: Seq[Row] => subjects.size }

Explanation:

  • Current representation of ArrayType is, as you already know, WrappedArray so Array won't work and it is better to stay on the safe side.
  • According to the official specification, the local (external) type for StructType is Row. Unfortunately it means that access to the individual fields is not type safe.

Notes:

  • To create struct in Spark < 2.3, function passed to udf has to return Product type (Tuple* or case class), not Row. That's because corresponding udf variants depend on Scala reflection:

    Defines a Scala closure of n arguments as user-defined function (UDF). The data types are automatically inferred based on the Scala closure's signature.

  • In Spark >= 2.3 it is possible to return Row directly, as long as the schema is provided.

    def udf(f: AnyRef, dataType: DataType): UserDefinedFunction Defines a deterministic user-defined function (UDF) using a Scala closure. For this variant, the caller must specify the output data type, and there is no automatic input type coercion.

    See for example How to create a Spark UDF in Java / Kotlin which returns a complex type?.

like image 130
zero323 Avatar answered Oct 01 '22 02:10

zero323