Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

scala.collection.mutable.WrappedArray$ofRef cannot be cast to Integer

Tags:

I'm fairly new to Spark and Scala. I'm trying to call a function as a Spark UDF but I run into this error that I can't seem to resolve.

I understand that in Scala, Array and Seq aren't the same. WrappedArray is a subtype of Seq and there is implicit conversions between WrappedArray and Array but I'm not sure why that doesn't happen in the case of the UDF.

Any pointers to help me understand and resolve this is much appreciated.

Here's a snippet of the code

def filterMapKeysWithSet(m: Map[Int, Int], a: Array[Int]): Map[Int, Int] = { val seqToArray = a.toArray val s = seqToArray.toSet m filterKeys s }  val myUDF = udf((m: Map[Int, Int], a: Array[Int]) => filterMapKeysWithSet(m, a))  case class myType(id: Int, m: Map[Int, Int]) val mapRDD = Seq(myType(1, Map(1 -> 100, 2 -> 200)), myType(2, Map(1 -> 100, 2 -> 200)), myType(3, Map(3 -> 300, 4 -> 400))) val mapDF = mapRDD.toDF  mapDF: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>] root  |-- id: integer (nullable = false)  |-- m: map (nullable = true)  |    |-- key: integer  |    |-- value: integer (valueContainsNull = false)  case class myType2(id: Int, a: Array[Int]) val idRDD = Seq(myType2(1, Array(1,2,100,200)), myType2(2, Array(100,200)), myType2(3, Array(1,2)) ) val idDF = idRDD.toDF  idDF: org.apache.spark.sql.DataFrame = [id: int, a: array<int>] root  |-- id: integer (nullable = false)  |-- a: array (nullable = true)  |    |-- element: integer (containsNull = false)  import sqlContext.implicits._ /* Hive context is exposed as sqlContext */  val j = mapDF.join(idDF, idDF("id") === mapDF("id")).drop(idDF("id")) val k = j.withColumn("filteredMap",myUDF(j("m"), j("a"))) k.show 

Looking at the Dataframe "j" & "k", the map and array columns have the right data types.

j: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>] root  |-- id: integer (nullable = false)  |-- m: map (nullable = true)  |    |-- key: integer  |    |-- value: integer (valueContainsNull = false)  |-- a: array (nullable = true)  |    |-- element: integer (containsNull = false)  k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>] root  |-- id: integer (nullable = false)  |-- m: map (nullable = true)  |    |-- key: integer  |    |-- value: integer (valueContainsNull = false)  |-- a: array (nullable = true)  |    |-- element: integer (containsNull = false)  |-- filteredMap: map (nullable = true)  |    |-- key: integer  |    |-- value: integer (valueContainsNull = false) 

However, an action on the Dataframe "k" that calls the UDF fails with the following error -

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, ip-100-74-42-194.ec2.internal): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60)     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)     at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)     at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)     at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)     at scala.collection.Iterator$class.foreach(Iterator.scala:727)     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)     at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)     at scala.collection.AbstractIterator.to(Iterator.scala:1157)     at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)     at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)     at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)     at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)     at org.apache.spark.scheduler.Task.run(Task.scala:89)     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)     at java.lang.Thread.run(Thread.java:745) 
like image 766
Yash Avatar asked Oct 23 '16 04:10

Yash


1 Answers

Changing the datatype from Array[Int] to Seq[Int] in the function filterMapKeysWithSet seems to resolve the above issue.

def filterMapKeysWithSet(m: Map[Int, Int], a: Seq[Int]): Map[Int, Int] = {      val seqToArray = a.toArray      val s = seqToArray.toSet      m filterKeys s    }  val myUDF = udf((m: Map[Int, Int], a: Seq[Int]) => filterMapKeysWithSet(m, a))  k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>] root  |-- id: integer (nullable = false)  |-- m: map (nullable = true)  |    |-- key: integer  |    |-- value: integer (valueContainsNull = false)  |-- a: array (nullable = true)  |    |-- element: integer (containsNull = false)  |-- filteredMap: map (nullable = true)  |    |-- key: integer  |    |-- value: integer (valueContainsNull = false)  +---+--------------------+----------------+--------------------+ | id|                   m|               a|         filteredMap| +---+--------------------+----------------+--------------------+ |  1|Map(1 -> 100, 2 -...|[1, 2, 100, 200]|Map(1 -> 100, 2 -...| |  2|Map(1 -> 100, 2 -...|      [100, 200]|               Map()| |  3|Map(3 -> 300, 4 -...|          [1, 2]|               Map()| +---+--------------------+----------------+--------------------+ 

So it looks like the ArrayType on Dataframe "idDF" is really a WrappedArray and not an Array - So the function call to "filterMapKeysWithSet" failed as it expected an Array but got a WrappedArray/ Seq instead (which doesn't implicitly convert to Array in Scala 2.8 and above).

like image 125
Yash Avatar answered Sep 18 '22 08:09

Yash