Given a dataframe in which one column is a sequence of structs generated by the following sequence
val df = spark
.range(10)
.map((i) => (i % 2, util.Random.nextInt(10), util.Random.nextInt(10)))
.toDF("a","b","c")
.groupBy("a")
.agg(collect_list(struct($"b",$"c")).as("my_list"))
df.printSchema
df.show(false)
Outputs
root
|-- a: long (nullable = false)
|-- my_list: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- b: integer (nullable = false)
| | |-- c: integer (nullable = false)
+---+-----------------------------------+
|a |my_list |
+---+-----------------------------------+
|0 |[[0,3], [9,5], [3,1], [4,2], [3,3]]|
|1 |[[1,7], [4,6], [5,9], [6,4], [3,9]]|
+---+-----------------------------------+
I need to run a function over each struct list. The function prototype is similar to the function below
case class DataPoint(b: Int, c: Int)
def do_something_with_data(data: Seq[DataPoint]): Double = {
// This is an example. I don't actually want the sum
data.map(data_point => data_point.b + data_point.c).sum
}
I want to store the result of this function to another DataFrame column.
I tried to run
val my_udf = udf(do_something_with_data(_))
val df_with_result = df.withColumn("result", my_udf($"my_list"))
df_with_result.show(false)
and got
17/07/13 12:33:42 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 225, REDACTED, executor 0): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<struct<b:int,c:int>>) => double)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line27.$read$$iw$$iw$DataPoint
at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$do_something_with_data$1.apply(<console>:29)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.do_something_with_data(<console>:29)
at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)
at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)
Is it possible to use a UDF like this without first casting my rows to a container struct with the DataFrame API?
Doing something like:
case class MyRow(a: Long, my_list: Seq[DataPoint])
df.as[MyRow].map(_ => (a, my_list, my_udf(my_list)))
using the DataSet api works, but I'd prefer to stick with the DataFrame API if possible.
Caveats of Using Spark UDFs: Spark UDFs are not good but why?? 1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer.
Description. Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.
UDF can return only a single column at the time.
PySpark UDF is a User Defined Function that is used to create a reusable function in Spark. Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering). The default type of the udf() is StringType.
You cannot use a case-class as the input-argument of your UDF (but you can return case classes from the UDF). To map an array of structs, you can pass in a Seq[Row]
to your UDF:
val my_uDF = udf((data: Seq[Row]) => {
// This is an example. I don't actually want the sum
data.map{case Row(x:Int,y:Int) => x+y}.sum
})
df.withColumn("result", my_udf($"my_list")).show
+---+--------------------+------+
| a| my_list|result|
+---+--------------------+------+
| 0|[[0,3], [5,5], [3...| 41|
| 1|[[0,9], [4,9], [6...| 54|
+---+--------------------+------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With