I want to write an Spark 1.6 UDF which takes the following map:
case class MyRow(mapping: Map[(Int, Int), Double])
val data = Seq(
MyRow(Map((1, 1) -> 1.0))
)
val df = sc.parallelize(data).toDF()
df.printSchema()
root
|-- mapping: map (nullable = true)
| |-- key: struct
| |-- value: double (valueContainsNull = false)
| | |-- _1: integer (nullable = false)
| | |-- _2: integer (nullable = false)
(As a side-note: I find the above output strange as the Type of the key is printed below the type of the value, why is that?)
Now I define my UDF as:
val myUDF = udf((inputMapping: Map[(Int,Int), Double]) =>
inputMapping.map { case ((i1, i2), value) => ((i1 + i2), value) }
)
df
.withColumn("udfResult", myUDF($"mapping"))
.show()
But this gives me:
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
So I tried to replace (Int,Int)
with a custom case class
, because this is how I normally do it if I want to pass a struct
to an UDF:
case class MyTuple2(i1: Int, i2: Int)
val myUDF = udf((inputMapping: Map[MyTuple2, Double]) =>
inputMapping.map { case (MyTuple2(i1, i2), value) => ((i1 + i2), value) }
)
This strangely gives :
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(mapping)' due to data type mismatch: argument 1 requires map<struct<i1:int,i2:int>,double> type, however, 'mapping' is of map<struct<_1:int,_2:int>,double> type.
I don't understand the above exception as the types match.
The only (ugly) solution I've found is passing a org.apache.spark.sql.Row
and then "extract" the elements of the struct:
val myUDF = udf((inputMapping: Map[Row, Double]) => inputMapping
.map { case (key, value) => ((key.getInt(0), key.getInt(1)), value) } // extract Row into Tuple2
.map { case ((i1, i2), value) => ((i1 + i2), value) }
)
Solution: PySpark provides a create_map() function that takes a list of column types as an argument and returns a MapType column, so we can use this to convert the DataFrame struct column to map Type. struct is a type of StructType and MapType is used to store Dictionary key-value pair.
1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer. Let's consider an example of a general optimization when reading data from Database or columnar format files such as Parquet is PredicatePushdown.
A StructType object can be constructed by StructType(fields: Seq[StructField]) For a StructType object, one or multiple StructField s can be extracted by names. If multiple StructField s are extracted, a StructType object will be returned. If a provided name does not have a matching field, it will be ignored.
In PySpark, MapType (also called map type) is the data type which is used to represent the Python Dictionary (dict) to store the key-value pair that is a MapType object which comprises of three fields that are key type (a DataType), a valueType (a DataType) and a valueContainsNull (a BooleanType).
As far as I know, there's no escaping the use of Row
in this context: a tuple (or case class) used within a map (or another tuple/case class/array...) is a nested structure, and as such it would be represented as a Row
when passed into a UDF.
The only improvement I can suggest is using Row.unapply
to simplify the code a bit:
val myUDF = udf((inputMapping: Map[Row, Double]) => inputMapping
.map { case (Row(i1: Int, i2: Int), value) => (i1 + i2, value) }
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With