Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Spark UDFs with struct sequences

Given a dataframe in which one column is a sequence of structs generated by the following sequence

val df = spark
  .range(10)
  .map((i) => (i % 2, util.Random.nextInt(10), util.Random.nextInt(10)))
  .toDF("a","b","c")
  .groupBy("a")
  .agg(collect_list(struct($"b",$"c")).as("my_list"))
df.printSchema
df.show(false)

Outputs

root
 |-- a: long (nullable = false)
 |-- my_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- b: integer (nullable = false)
 |    |    |-- c: integer (nullable = false)

+---+-----------------------------------+
|a  |my_list                            |
+---+-----------------------------------+
|0  |[[0,3], [9,5], [3,1], [4,2], [3,3]]|
|1  |[[1,7], [4,6], [5,9], [6,4], [3,9]]|
+---+-----------------------------------+

I need to run a function over each struct list. The function prototype is similar to the function below

case class DataPoint(b: Int, c: Int)
def do_something_with_data(data: Seq[DataPoint]): Double = {
  // This is an example. I don't actually want the sum
  data.map(data_point => data_point.b + data_point.c).sum
}

I want to store the result of this function to another DataFrame column.

I tried to run

val my_udf = udf(do_something_with_data(_))
val df_with_result = df.withColumn("result", my_udf($"my_list"))
df_with_result.show(false)

and got

17/07/13 12:33:42 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 225, REDACTED, executor 0): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<struct<b:int,c:int>>) => double)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line27.$read$$iw$$iw$DataPoint
    at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$do_something_with_data$1.apply(<console>:29)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.do_something_with_data(<console>:29)
    at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)
    at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)

Is it possible to use a UDF like this without first casting my rows to a container struct with the DataFrame API?

Doing something like:

case class MyRow(a: Long, my_list: Seq[DataPoint])
df.as[MyRow].map(_ => (a, my_list, my_udf(my_list)))

using the DataSet api works, but I'd prefer to stick with the DataFrame API if possible.

like image 355
Steven Sheffey Avatar asked Jul 13 '17 12:07

Steven Sheffey


People also ask

Why UDF are not recommended in Spark?

Caveats of Using Spark UDFs: Spark UDFs are not good but why?? 1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer.

What is difference between UDF and UDAF in Spark SQL?

Description. Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.

Can Spark UDF return multiple columns?

UDF can return only a single column at the time.

How do PySpark UDFs work?

PySpark UDF is a User Defined Function that is used to create a reusable function in Spark. Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering). The default type of the udf() is StringType.


1 Answers

You cannot use a case-class as the input-argument of your UDF (but you can return case classes from the UDF). To map an array of structs, you can pass in a Seq[Row] to your UDF:

val  my_uDF = udf((data: Seq[Row]) => {
  // This is an example. I don't actually want the sum
  data.map{case Row(x:Int,y:Int) => x+y}.sum
})

df.withColumn("result", my_udf($"my_list")).show

+---+--------------------+------+
|  a|             my_list|result|
+---+--------------------+------+
|  0|[[0,3], [5,5], [3...|    41|
|  1|[[0,9], [4,9], [6...|    54|
+---+--------------------+------+
like image 198
Raphael Roth Avatar answered Sep 28 '22 07:09

Raphael Roth