I am trying to calculate Histogram of all columns from a CSV file using Spark Scala.
I found that DoubleRDDFunctions supporting Histogram. So I coded like following for getting histogram of all columns.
Create RDD[double]
of each column and calculate Histogram of each RDD
using DoubleRDDFunctions
var columnIndexArray = Array.tabulate(rdd.first().length) (_ * 1)
val histogramData = columnIndexArray.map(columns => {
rdd.map(lines => lines(columns)).histogram(6)
})
Is it a good way ? Can anyone suggest some better ways to tackle this ?
Thanks in advance.
Not exactly better but alternative way is to convert a RDD to a DataFrame and use histogram_numeric
UDF.
Example data:
import scala.util.Random
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{callUDF, lit, col}
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
val sqlContext = new HiveContext(sc)
Random.setSeed(1)
val ncol = 5
val rdd = sc.parallelize((1 to 1000).map(
_ => Row.fromSeq(Array.fill(ncol)(Random.nextDouble))
))
val schema = StructType(
(1 to ncol).map(i => StructField(s"x$i", DoubleType, false)))
val df = sqlContext.createDataFrame(rdd, schema)
df.registerTempTable("df")
Query:
val nBuckets = 3
val columns = df.columns.map(
c => callUDF("histogram_numeric", col(c), lit(nBuckets)).alias(c))
val histograms = df.select(columns: _*)
histograms.printSchema
// root
// |-- x1: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
// |-- x2: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
// |-- x3: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
// |-- x4: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
// |-- x5: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- x: double (nullable = true)
// | | |-- y: double (nullable = true)
histograms.select($"x1").collect()
// Array([WrappedArray([0.16874313309969038,334.0],
// [0.513382068667877,345.0], [0.8421388886903808,321.0])])
the (scala api) transformation, countByValue ought to do what you want
so for instance to generate histogram data for the first column in your RDD:
val histCol1 = RDD.map(record => record.col_1).countByValue()
in the expression above, record just refers to a data row in the RDD, an instance of a case class that has a field col_1
and so histCol1 will return a hash table (Scala Map) in which the keys are the unique values in column 1 (col_1) and the values are obviously the frequencies of each unique value
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With