A Spark DataFrame contains a column of type Array[Double]. It throw a ClassCastException exception when I try to get it back in a map() function. The following Scala code generate an exception.
case class Dummy( x:Array[Double] )
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3))))
val s = df.map( r => {
val arr:Array[Double] = r.getAs[Array[Double]]("x")
arr.sum
})
s.foreach(println)
The exception is
java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:24)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:23)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Cam somebody explain me why it does not work? what should I do instead? I am using Spark 1.5.1 and scala 2.10.6
Thanks
You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.
Spark ArrayType is a collection data type that extends the DataType class which is a superclass of all types in Spark. All elements of ArrayType should have the same type of elements.
In PySpark, the select() function is mostly used to select the single, multiple, column by the index, all columns from the list and also the nested columns from the DataFrame. The PySpark select() is the transformation function that is it returns the new DataFrame with the selected columns.
ArrayType
is represented in a Row
as a scala.collection.mutable.WrappedArray
. You can extract it using for example
val arr: Seq[Double] = r.getAs[Seq[Double]]("x")
or
val i: Int = ???
val arr = r.getSeq[Double](i)
or even:
import scala.collection.mutable.WrappedArray
val arr: WrappedArray[Double] = r.getAs[WrappedArray[Double]]("x")
If DataFrame
is relatively thin then pattern matching could be a better approach:
import org.apache.spark.sql.Row
df.rdd.map{case Row(x: Seq[Double]) => (x.toArray, x.sum)}
although you have to keep in mind that the type of the sequence is unchecked.
In Spark >= 1.6 you can also use Dataset
as follows:
df.select("x").as[Seq[Double]].rdd
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With