Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Passing a map with struct-type key into a Spark UDF

I want to write an Spark 1.6 UDF which takes the following map:

case class MyRow(mapping: Map[(Int, Int), Double])

val data = Seq(
  MyRow(Map((1, 1) -> 1.0))
val df = sc.parallelize(data).toDF()


 |-- mapping: map (nullable = true)
 |    |-- key: struct
 |    |-- value: double (valueContainsNull = false)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: integer (nullable = false)

(As a side-note: I find the above output strange as the Type of the key is printed below the type of the value, why is that?)

Now I define my UDF as:

val myUDF = udf((inputMapping: Map[(Int,Int), Double]) =>
  inputMapping.map { case ((i1, i2), value) => ((i1 + i2), value) }

  .withColumn("udfResult", myUDF($"mapping"))

But this gives me:

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2

So I tried to replace (Int,Int) with a custom case class, because this is how I normally do it if I want to pass a struct to an UDF:

case class MyTuple2(i1: Int, i2: Int)
val myUDF = udf((inputMapping: Map[MyTuple2, Double]) => 
  inputMapping.map { case (MyTuple2(i1, i2), value) => ((i1 + i2), value) }

This strangely gives :

org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(mapping)' due to data type mismatch: argument 1 requires map<struct<i1:int,i2:int>,double> type, however, 'mapping' is of map<struct<_1:int,_2:int>,double> type.

I don't understand the above exception as the types match.

The only (ugly) solution I've found is passing a org.apache.spark.sql.Row and then "extract" the elements of the struct:

val myUDF = udf((inputMapping: Map[Row, Double]) => inputMapping
  .map { case (key, value) => ((key.getInt(0), key.getInt(1)), value) } // extract Row into Tuple2
  .map { case ((i1, i2), value) => ((i1 + i2), value) }
like image 793
Raphael Roth Avatar asked Jan 23 '17 12:01

Raphael Roth

People also ask

How do you convert struct to map in PySpark?

Solution: PySpark provides a create_map() function that takes a list of column types as an argument and returns a MapType column, so we can use this to convert the DataFrame struct column to map Type. struct is a type of StructType and MapType is used to store Dictionary key-value pair.

Why UDF are not recommended in Spark?

1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer. Let's consider an example of a general optimization when reading data from Database or columnar format files such as Parquet is PredicatePushdown.

What is struct type in Spark?

A StructType object can be constructed by StructType(fields: Seq[StructField]) For a StructType object, one or multiple StructField s can be extracted by names. If multiple StructField s are extracted, a StructType object will be returned. If a provided name does not have a matching field, it will be ignored.

What is MapType?

In PySpark, MapType (also called map type) is the data type which is used to represent the Python Dictionary (dict) to store the key-value pair that is a MapType object which comprises of three fields that are key type (a DataType), a valueType (a DataType) and a valueContainsNull (a BooleanType).

1 Answers

As far as I know, there's no escaping the use of Row in this context: a tuple (or case class) used within a map (or another tuple/case class/array...) is a nested structure, and as such it would be represented as a Row when passed into a UDF.

The only improvement I can suggest is using Row.unapply to simplify the code a bit:

val myUDF = udf((inputMapping: Map[Row, Double]) => inputMapping
  .map { case (Row(i1: Int, i2: Int), value) => (i1 + i2, value) }
like image 127
Tzach Zohar Avatar answered Sep 20 '22 21:09

Tzach Zohar