Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark UDF for StructType / Row

Tags:

I have a "StructType" column in spark Dataframe that has an array and a string as sub-fields. I'd like to modify the array and return the new column of the same type. Can I process it with UDF? Or what are the alternatives?

import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val sub_schema = StructType(StructField("col1",ArrayType(IntegerType,false),true) :: StructField("col2",StringType,true)::Nil) val schema = StructType(StructField("subtable", sub_schema,true) :: Nil) val data = Seq(Row(Row(Array(1,2),"eb")),  Row(Row(Array(3,2,1), "dsf")) ) val rd = sc.parallelize(data) val df = spark.createDataFrame(rd, schema) df.printSchema  root  |-- subtable: struct (nullable = true)  |    |-- col1: array (nullable = true)  |    |    |-- element: integer (containsNull = false)  |    |-- col2: string (nullable = true) 

It seems that I need a UDF of the type Row, something like

val u =  udf((x:Row) => x)        >> Schema for type org.apache.spark.sql.Row is not supported 

This makes sense, since Spark does not know the schema for the return type. Unfortunately, udf.register fails too:

spark.udf.register("foo", (x:Row)=> Row, sub_schema)      <console>:30: error: overloaded method value register with alternatives: ... 
like image 780
Danil Kirsanov Avatar asked Mar 21 '17 15:03

Danil Kirsanov


People also ask

Why we should not use UDF in Spark?

It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .

What is StructType and StructField in Spark?

Spark SQL StructType & StructField classes are used to programmatically specify the schema to the DataFrame and creating complex columns like nested struct, array and map columns. StructType is a collection of StructField's.

Is Spark UDF faster?

In these circumstances, PySpark UDF is around 10 times more performant than the PySpark Pandas UDF. We have also found that creating a Python wrapper to call Scala UDF from PySpark code is around 15 times more performant than the two types of PySpark UDFs.

What is difference between UDF and UDAF in Spark SQL?

Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.


2 Answers

turns out you can pass the result schema as a second UDF parameter:

val u =  udf((x:Row) => x, sub_schema) 
like image 58
Danil Kirsanov Avatar answered Oct 02 '22 20:10

Danil Kirsanov


You are on the right track. In this scenario UDF will make your life easy. As you have already encountered, UDF can not return types which spark does not know about. So basically you will need return something which spark can easily serialize. It may be a case class or you can return a tuple like (Seq[Int], String) . So here is a modified version of your code:

def main(args: Array[String]): Unit = {   import org.apache.spark.sql.Row   import org.apache.spark.sql.functions._   import org.apache.spark.sql.types._   val sub_schema = StructType(StructField("col1", ArrayType(IntegerType, false), true) :: StructField("col2", StringType, true) :: Nil)   val schema = StructType(StructField("subtable", sub_schema, true) :: Nil)   val data = Seq(Row(Row(Array(1, 2), "eb")), Row(Row(Array(3, 2, 1), "dsf")))   val rd = spark.sparkContext.parallelize(data)   val df = spark.createDataFrame(rd, schema)    df.printSchema()   df.show(false)    val mapArray = (subRows: Row) => {     // I prefer reading values from row by specifying column names, you may use index also     val col1 = subRows.getAs[Seq[Int]]("col1")     val mappedCol1 = col1.map(x => x * x) // Use map based on your requirements     (mappedCol1, subRows.getAs[String]("col2")) // now mapping is done for col2   }   val mapUdf = udf(mapArray)    val newDf = df.withColumn("col1_mapped", mapUdf(df("subtable")))   newDf.show(false)   newDf.printSchema() } 

Please take a look at these links, these may give you more insight.

  1. Most comprehensive answer on working with complex schema: https://stackoverflow.com/a/33850490/4046067
  2. Spark supported data types: https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types
like image 22
Tawkir Avatar answered Oct 02 '22 20:10

Tawkir