Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Access Array column in Spark

A Spark DataFrame contains a column of type Array[Double]. It throw a ClassCastException exception when I try to get it back in a map() function. The following Scala code generate an exception.

case class Dummy( x:Array[Double] )
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3))))
val s = df.map( r => {
   val arr:Array[Double] = r.getAs[Array[Double]]("x")
   arr.sum
})
s.foreach(println)

The exception is

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:24)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:23)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Cam somebody explain me why it does not work? what should I do instead? I am using Spark 1.5.1 and scala 2.10.6

Thanks

like image 368
Boris Avatar asked Oct 28 '15 12:10

Boris


People also ask

How do I select a column in spark DataFrame?

You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.

What is ArrayType in spark?

Spark ArrayType is a collection data type that extends the DataType class which is a superclass of all types in Spark. All elements of ArrayType should have the same type of elements.

How do you select a column in PySpark DataFrame?

In PySpark, the select() function is mostly used to select the single, multiple, column by the index, all columns from the list and also the nested columns from the DataFrame. The PySpark select() is the transformation function that is it returns the new DataFrame with the selected columns.


1 Answers

ArrayType is represented in a Row as a scala.collection.mutable.WrappedArray. You can extract it using for example

val arr: Seq[Double] = r.getAs[Seq[Double]]("x")

or

val i: Int = ???
val arr = r.getSeq[Double](i)

or even:

import scala.collection.mutable.WrappedArray

val arr: WrappedArray[Double] = r.getAs[WrappedArray[Double]]("x")

If DataFrame is relatively thin then pattern matching could be a better approach:

import org.apache.spark.sql.Row

df.rdd.map{case Row(x: Seq[Double]) => (x.toArray, x.sum)}

although you have to keep in mind that the type of the sequence is unchecked.

In Spark >= 1.6 you can also use Dataset as follows:

df.select("x").as[Seq[Double]].rdd
like image 109
zero323 Avatar answered Sep 28 '22 05:09

zero323