Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reshape Spark DataFrame from Long to Wide On Large Data Sets

I am trying to reshape my dataframe from long to wide using Spark DataFrame API. The data set is the collection of questions and answers from student's questionary. It's a huge data set and the Q(Question) and A(Answer) approximately range from 1 to 50000. I would like to collect all the possible pairs of Q*A and use them to build columns. If a student answered 1 to Question 1, we assign a value 1 to column 1_1. Otherwise, we give it a 0. The data set has been de-duplicated on S_ID, Q, A.

In R, I can simply use dcast in the library reshape2 but I don't know how to do it using Spark. I have found the solution to pivot in the below link but it required a fix number of distinct pairs of Q*A. http://rajasoftware.net/index.php/database/91446/scala-apache-spark-pivot-dataframes-pivot-spark-dataframe

I also tried concatenating Q and A using user-defined function and them apply crosstab However, I got the below error from the console even though so far I only test my code on a sample data file-

The maximum limit of le6 pairs have been collected, which may not be all of the pairs.  
Please try reducing the amount of distinct items in your columns.

Original Data:

S_ID, Q, A
1, 1, 1
1, 2, 2
1, 3, 3
2, 1, 1
2, 2, 3
2, 3, 4
2, 4, 5

=> After long-to-wide transformation:

S_ID, QA_1_1, QA_2_2, QA_3_3, QA_2_3, QA_3_4, QA_4_5
1, 1, 1, 1, 0, 0, 0
2, 1, 0, 0, 1, 1, 1

R code.  
library(dplyr); library(reshape2);  
df1 <- df %>% group_by(S_ID, Q, A) %>% filter(row_number()==1) %>% mutate(temp=1)  
df1 %>% dcast(S_ID ~ Q + A, value.var="temp", fill=0)  

Spark code.
val fnConcatenate = udf((x: String, y: String) => {"QA_"+ x +"_" + y})
df1 = df.distinct.withColumn("QA", fnConcatenate($"Q", $"A"))
df2 = stat.crosstab("S_ID", "QA")

Any thought would be appreciated.

like image 455
SH Y. Avatar asked Oct 30 '22 21:10

SH Y.


1 Answers

What you are trying to do here is faulty by design for two reasons:

  1. You replace sparse data set with a dense one. It is expensive both when it comes to memory requirements and computations and it is almost never a good idea when you have a large dataset
  2. You limit ability to process data locally. Simplifying things a little bit Spark data frames are just a wrappers around RDD[Row]. It means that larger the row the less you can place on a single partition and in consequence operations like aggregations are much more expensive and require more network traffic.

Wide tables are useful when you have a proper columnar storage when you can implement things like efficient compression or aggregations. From the practical perspective almost everything you can do with wide table can be done with a long one using group / window functions.

One thing you can try is to use sparse vector to create wide-like format:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.StringIndexer
import sqlContext.implicits._

df.registerTempTable("df")
val dfComb = sqlContext.sql("SELECT s_id, CONCAT(Q, '\t', A) AS qa FROM df")

val indexer = new StringIndexer()
  .setInputCol("qa")
  .setOutputCol("idx")
  .fit(dfComb)

val indexed = indexer.transform(dfComb)

val n = indexed.agg(max("idx")).first.getDouble(0).toInt + 1

val wideLikeDF = indexed
  .select($"s_id", $"idx")
  .rdd
  .map{case Row(s_id: String, idx: Double) => (s_id, idx.toInt)}
  .groupByKey // This assumes no duplicates
  .mapValues(vals => Vectors.sparse(n, vals.map((_, 1.0)).toArray))
  .toDF("id", "qaVec")

Cool part here is you can easily convert it to IndexedRowMatrix and for example compute SVD

val mat = new IndexedRowMatrix(wideLikeDF.map{
  // Here we assume that s_id can be mapped directly to Long
  // If not it has to be indexed
  case Row(id: String, qaVec: SparseVector) => IndexedRow(id.toLong, qaVec)
})

val svd = mat.computeSVD(3)

or to RowMatrix and get column statistics or compute Principal Components:

val colStats = mat.toRowMatrix.computeColumnSummaryStatistic
val colSims = mat.toRowMatrix.columnSimilarities
val pc = mat.toRowMatrix.computePrincipalComponents(3)

Edit:

In Spark 1.6.0+ you can use pivot function.

like image 185
zero323 Avatar answered Nov 15 '22 05:11

zero323