I'm trying to use UDF with input type Array of struct. I have the following structure of data this is only relevant part of a bigger structure
|--investments: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- funding_round: struct (nullable = true)
| | | |-- company: struct (nullable = true)
| | | | |-- name: string (nullable = true)
| | | | |-- permalink: string (nullable = true)
| | | |-- funded_day: long (nullable = true)
| | | |-- funded_month: long (nullable = true)
| | | |-- funded_year: long (nullable = true)
| | | |-- raised_amount: long (nullable = true)
| | | |-- raised_currency_code: string (nullable = true)
| | | |-- round_code: string (nullable = true)
| | | |-- source_description: string (nullable = true)
| | | |-- source_url: string (nullable = true)
I declared case classes:
case class Company(name: String, permalink: String)
case class FundingRound(company: Company, funded_day: Long, funded_month: Long, funded_year: Long, raised_amount: Long, raised_currency_code: String, round_code: String, source_description: String, source_url: String)
case class Investments(funding_round: FundingRound)
UDF declaration:
sqlContext.udf.register("total_funding", (investments:Seq[Investments]) => {
val totals = investments.map(r => r.funding_round.raised_amount)
totals.sum
})
When I'm executing the following transformation the result is as expected
scala> sqlContext.sql("""select total_funding(investments) from companies""")
res11: org.apache.spark.sql.DataFrame = [_c0: bigint]
But when an action executed like collect I have an error:
Executor: Exception in task 0.0 in stage 4.0 (TID 10)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line33.$read$$iwC$$iwC$Investments
Thank you for any help.
In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.
Description. Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.
Spark UDFs are not good but why?? 1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer.
UDF can return only a single column at the time.
The error you see should be pretty much self-explanatory. There is a strict mapping between Catalyst / SQL types and Scala types which can be found in the relevant section of the Spark SQL, DataFrames and Datasets Guide.
In particular struct
types are converted to o.a.s.sql.Row
(in your particular case data will be exposed as Seq[Row]
).
There are different methods which can be used to expose data as specific types:
DataFrame
to Dataset[T]
where T
is a desired local type.with only the former approach could be applicable in this particular scenario.
If you want to access investments.funding_round.raised_amount
using UDF you'll need something like this:
val getRaisedAmount = udf((investments: Seq[Row]) => scala.util.Try(
investments.map(_.getAs[Row]("funding_round").getAs[Long]("raised_amount"))
).toOption)
but simple select
should be much safer and cleaner:
df.select($"investments.funding_round.raised_amount")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With