Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL UDF with complex input parameter

I'm trying to use UDF with input type Array of struct. I have the following structure of data this is only relevant part of a bigger structure

|--investments: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- funding_round: struct (nullable = true)
    |    |    |    |-- company: struct (nullable = true)
    |    |    |    |    |-- name: string (nullable = true)
    |    |    |    |    |-- permalink: string (nullable = true)
    |    |    |    |-- funded_day: long (nullable = true)
    |    |    |    |-- funded_month: long (nullable = true)
    |    |    |    |-- funded_year: long (nullable = true)
    |    |    |    |-- raised_amount: long (nullable = true)
    |    |    |    |-- raised_currency_code: string (nullable = true)
    |    |    |    |-- round_code: string (nullable = true)
    |    |    |    |-- source_description: string (nullable = true)
    |    |    |    |-- source_url: string (nullable = true)

I declared case classes:

case class Company(name: String, permalink: String)
case class FundingRound(company: Company, funded_day: Long, funded_month: Long, funded_year: Long, raised_amount: Long, raised_currency_code: String, round_code: String, source_description: String, source_url: String)
case class Investments(funding_round: FundingRound)

UDF declaration:

sqlContext.udf.register("total_funding", (investments:Seq[Investments])  => {
     val totals = investments.map(r => r.funding_round.raised_amount)
     totals.sum
})

When I'm executing the following transformation the result is as expected

scala> sqlContext.sql("""select total_funding(investments) from companies""")
res11: org.apache.spark.sql.DataFrame = [_c0: bigint]

But when an action executed like collect I have an error:

Executor: Exception in task 0.0 in stage 4.0 (TID 10)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line33.$read$$iwC$$iwC$Investments

Thank you for any help.

like image 791
Lev Avatar asked Jul 16 '16 16:07

Lev


People also ask

Can I use UDF in Spark SQL?

In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.

What is difference between UDF & UDAF in Spark SQL?

Description. Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.

Why we should not use UDF in Spark?

Spark UDFs are not good but why?? 1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer.

Can a UDF return multiple columns?

UDF can return only a single column at the time.


1 Answers

The error you see should be pretty much self-explanatory. There is a strict mapping between Catalyst / SQL types and Scala types which can be found in the relevant section of the Spark SQL, DataFrames and Datasets Guide.

In particular struct types are converted to o.a.s.sql.Row (in your particular case data will be exposed as Seq[Row]).

There are different methods which can be used to expose data as specific types:

  • Defining UDT (user defined type) which has been removed in 2.0.0 and has no replacement for now.
  • Converting DataFrame to Dataset[T] where T is a desired local type.

with only the former approach could be applicable in this particular scenario.

If you want to access investments.funding_round.raised_amount using UDF you'll need something like this:

val getRaisedAmount = udf((investments: Seq[Row]) => scala.util.Try(
  investments.map(_.getAs[Row]("funding_round").getAs[Long]("raised_amount"))
).toOption)

but simple select should be much safer and cleaner:

df.select($"investments.funding_round.raised_amount")
like image 178
zero323 Avatar answered Jan 01 '23 07:01

zero323