Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Define StructType as input datatype of a Function Spark-Scala 2.11 [duplicate]

I'm trying to write a Spark UDF in scala, I need to define a Function's input datatype

I have a schema variable with the StructType, mentioned the same below.

import org.apache.spark.sql.types._

val relationsSchema = StructType(
      Seq(
        StructField("relation", ArrayType(
          StructType(Seq(
            StructField("attribute", StringType, true),
            StructField("email", StringType, true),
            StructField("fname", StringType, true),
            StructField("lname", StringType, true)
            )
          ), true
        ), true)
      )
    )

I'm trying to write a Function like below

val relationsFunc: Array[Map[String,String]] => Array[String] = _.map(do something)
val relationUDF = udf(relationsFunc)

input.withColumn("relation",relationUDF(col("relation")))

above code throws below exception

org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(relation)' due to data type mismatch: argument 1 requires array<map<string,string>> type, however, '`relation`' is of array<struct<attribute:string,email:string,fname:string,lname:string>> type.;;
'Project [relation#89, UDF(relation#89) AS proc#273]

if I give the input type as

val relationsFunc: StructType => Array[String] =

I'm not able to implement the logic, as _.map gives me metadata, filed names, etc.

Please advice how to define relationsSchema as input datatype in the below function.

val relationsFunc: ? => Array[String] = _.map(somelogic)
like image 601
Rohit Nimmala Avatar asked Nov 17 '25 11:11

Rohit Nimmala


2 Answers

Your structure under relation is a Row, so your function should have the following signature :

val relationsFunc: Array[Row] => Array[String]

then you can access your data either by position or by name, ie :

{r:Row => r.getAs[String]("email")}
like image 132
baitmbarek Avatar answered Nov 19 '25 10:11

baitmbarek


Check the mapping table in the documentation to determine the data type representations between Spark SQL and Scala: https://spark.apache.org/docs/2.4.4/sql-reference.html#data-types

Your relation field is a Spark SQL complex type of type StructType, which is represented by Scala type org.apache.spark.sql.Row so this is the input type you should be using.

I used your code to create this complete working example that extracts email values:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val relationsSchema = StructType(
  Seq(
    StructField("relation", ArrayType(
      StructType(
        Seq(
          StructField("attribute", StringType, true),
          StructField("email", StringType, true),
          StructField("fname", StringType, true),
          StructField("lname", StringType, true)
        )
      ), true
    ), true)
  )
)

val data = Seq(
  Row("{'relation':[{'attribute':'1','email':'[email protected]','fname': 'Johnny','lname': 'Appleseed'}]}")
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  relationsSchema
)

val relationsFunc = (relation: Array[Row]) => relation.map(_.getAs[String]("email"))
val relationUdf = udf(relationsFunc)

df.withColumn("relation", relationUdf(col("relation")))
like image 23
stereosky Avatar answered Nov 19 '25 08:11

stereosky



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!