I'm fairly new to Spark and Scala. I'm trying to call a function as a Spark UDF but I run into this error that I can't seem to resolve.
I understand that in Scala, Array and Seq aren't the same. WrappedArray is a subtype of Seq and there is implicit conversions between WrappedArray and Array but I'm not sure why that doesn't happen in the case of the UDF.
Any pointers to help me understand and resolve this is much appreciated.
Here's a snippet of the code
def filterMapKeysWithSet(m: Map[Int, Int], a: Array[Int]): Map[Int, Int] = { val seqToArray = a.toArray val s = seqToArray.toSet m filterKeys s } val myUDF = udf((m: Map[Int, Int], a: Array[Int]) => filterMapKeysWithSet(m, a)) case class myType(id: Int, m: Map[Int, Int]) val mapRDD = Seq(myType(1, Map(1 -> 100, 2 -> 200)), myType(2, Map(1 -> 100, 2 -> 200)), myType(3, Map(3 -> 300, 4 -> 400))) val mapDF = mapRDD.toDF mapDF: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>] root |-- id: integer (nullable = false) |-- m: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) case class myType2(id: Int, a: Array[Int]) val idRDD = Seq(myType2(1, Array(1,2,100,200)), myType2(2, Array(100,200)), myType2(3, Array(1,2)) ) val idDF = idRDD.toDF idDF: org.apache.spark.sql.DataFrame = [id: int, a: array<int>] root |-- id: integer (nullable = false) |-- a: array (nullable = true) | |-- element: integer (containsNull = false) import sqlContext.implicits._ /* Hive context is exposed as sqlContext */ val j = mapDF.join(idDF, idDF("id") === mapDF("id")).drop(idDF("id")) val k = j.withColumn("filteredMap",myUDF(j("m"), j("a"))) k.show
Looking at the Dataframe "j" & "k", the map and array columns have the right data types.
j: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>] root |-- id: integer (nullable = false) |-- m: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) |-- a: array (nullable = true) | |-- element: integer (containsNull = false) k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>] root |-- id: integer (nullable = false) |-- m: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) |-- a: array (nullable = true) | |-- element: integer (containsNull = false) |-- filteredMap: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false)
However, an action on the Dataframe "k" that calls the UDF fails with the following error -
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, ip-100-74-42-194.ec2.internal): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Changing the datatype from Array[Int] to Seq[Int] in the function filterMapKeysWithSet seems to resolve the above issue.
def filterMapKeysWithSet(m: Map[Int, Int], a: Seq[Int]): Map[Int, Int] = { val seqToArray = a.toArray val s = seqToArray.toSet m filterKeys s } val myUDF = udf((m: Map[Int, Int], a: Seq[Int]) => filterMapKeysWithSet(m, a)) k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>] root |-- id: integer (nullable = false) |-- m: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) |-- a: array (nullable = true) | |-- element: integer (containsNull = false) |-- filteredMap: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) +---+--------------------+----------------+--------------------+ | id| m| a| filteredMap| +---+--------------------+----------------+--------------------+ | 1|Map(1 -> 100, 2 -...|[1, 2, 100, 200]|Map(1 -> 100, 2 -...| | 2|Map(1 -> 100, 2 -...| [100, 200]| Map()| | 3|Map(3 -> 300, 4 -...| [1, 2]| Map()| +---+--------------------+----------------+--------------------+
So it looks like the ArrayType on Dataframe "idDF" is really a WrappedArray and not an Array - So the function call to "filterMapKeysWithSet" failed as it expected an Array but got a WrappedArray/ Seq instead (which doesn't implicitly convert to Array in Scala 2.8 and above).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With