I am trying to reshape my dataframe from long to wide using Spark DataFrame API. The data set is the collection of questions and answers from student's questionary. It's a huge data set and the Q(Question) and A(Answer) approximately range from 1 to 50000. I would like to collect all the possible pairs of Q*A and use them to build columns. If a student answered 1 to Question 1, we assign a value 1 to column 1_1. Otherwise, we give it a 0. The data set has been de-duplicated on S_ID, Q, A.
In R, I can simply use dcast in the library reshape2 but I don't know how to do it using Spark. I have found the solution to pivot in the below link but it required a fix number of distinct pairs of Q*A. http://rajasoftware.net/index.php/database/91446/scala-apache-spark-pivot-dataframes-pivot-spark-dataframe
I also tried concatenating Q and A using user-defined function and them apply crosstab However, I got the below error from the console even though so far I only test my code on a sample data file-
The maximum limit of le6 pairs have been collected, which may not be all of the pairs.
Please try reducing the amount of distinct items in your columns.
Original Data:
S_ID, Q, A
1, 1, 1
1, 2, 2
1, 3, 3
2, 1, 1
2, 2, 3
2, 3, 4
2, 4, 5
=> After long-to-wide transformation:
S_ID, QA_1_1, QA_2_2, QA_3_3, QA_2_3, QA_3_4, QA_4_5
1, 1, 1, 1, 0, 0, 0
2, 1, 0, 0, 1, 1, 1
R code.
library(dplyr); library(reshape2);
df1 <- df %>% group_by(S_ID, Q, A) %>% filter(row_number()==1) %>% mutate(temp=1)
df1 %>% dcast(S_ID ~ Q + A, value.var="temp", fill=0)
Spark code.
val fnConcatenate = udf((x: String, y: String) => {"QA_"+ x +"_" + y})
df1 = df.distinct.withColumn("QA", fnConcatenate($"Q", $"A"))
df2 = stat.crosstab("S_ID", "QA")
Any thought would be appreciated.
What you are trying to do here is faulty by design for two reasons:
RDD[Row]
. It means that larger the row the less you can place on a single partition and in consequence operations like aggregations are much more expensive and require more network traffic.Wide tables are useful when you have a proper columnar storage when you can implement things like efficient compression or aggregations. From the practical perspective almost everything you can do with wide table can be done with a long one using group / window functions.
One thing you can try is to use sparse vector to create wide-like format:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.StringIndexer
import sqlContext.implicits._
df.registerTempTable("df")
val dfComb = sqlContext.sql("SELECT s_id, CONCAT(Q, '\t', A) AS qa FROM df")
val indexer = new StringIndexer()
.setInputCol("qa")
.setOutputCol("idx")
.fit(dfComb)
val indexed = indexer.transform(dfComb)
val n = indexed.agg(max("idx")).first.getDouble(0).toInt + 1
val wideLikeDF = indexed
.select($"s_id", $"idx")
.rdd
.map{case Row(s_id: String, idx: Double) => (s_id, idx.toInt)}
.groupByKey // This assumes no duplicates
.mapValues(vals => Vectors.sparse(n, vals.map((_, 1.0)).toArray))
.toDF("id", "qaVec")
Cool part here is you can easily convert it to IndexedRowMatrix
and for example compute SVD
val mat = new IndexedRowMatrix(wideLikeDF.map{
// Here we assume that s_id can be mapped directly to Long
// If not it has to be indexed
case Row(id: String, qaVec: SparseVector) => IndexedRow(id.toLong, qaVec)
})
val svd = mat.computeSVD(3)
or to RowMatrix
and get column statistics or compute Principal Components:
val colStats = mat.toRowMatrix.computeColumnSummaryStatistic
val colSims = mat.toRowMatrix.columnSimilarities
val pc = mat.toRowMatrix.computePrincipalComponents(3)
Edit:
In Spark 1.6.0+ you can use pivot
function.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With