Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Transpose DataFrame Without Aggregating

I have looked at a number of questions online, but they don't seem to do what I'm trying to achieve.

I'm using Apache Spark 2.0.2 with Scala.

I have a dataframe:

+----------+-----+----+----+----+----+----+
|segment_id| val1|val2|val3|val4|val5|val6|
+----------+-----+----+----+----+----+----+
|         1|  100|   0|   0|   0|   0|   0|
|         2|    0|  50|   0|   0|  20|   0|
|         3|    0|   0|   0|   0|   0|   0|
|         4|    0|   0|   0|   0|   0|   0|
+----------+-----+----+----+----+----+----+

which I want to transpose to

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
|val2|    0|  50|   0|   0|
|val3|    0|   0|   0|   0|
|val4|    0|   0|   0|   0|
|val5|    0|  20|   0|   0|
|val6|    0|   0|   0|   0|
+----+-----+----+----+----+

I've tried using pivot() but I couldn't get to the right answer. I ended up looping through my val{x} columns, and pivoting each as per below, but this is proving to be very slow.

val d = df.select('segment_id, 'val1)

+----------+-----+
|segment_id| val1|
+----------+-----+
|         1|  100|
|         2|    0|
|         3|    0|
|         4|    0|
+----------+-----+

d.groupBy('val1).sum().withColumnRenamed('val1', 'vals')

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
+----+-----+----+----+----+

Then using union() on each iteration of val{x} to my first dataframe.

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val2|    0|  50|   0|   0|
+----+-----+----+----+----+

Is there a more efficient way of a transpose where I do not want to aggregate data?

Thanks :)

like image 696
nevi_me Avatar asked Nov 30 '16 15:11

nevi_me


People also ask

How do you transpose in Spark?

Spark SQL provides a pivot() function to rotate the data from one column into multiple columns (transpose row to column). It is an aggregation where one of the grouping columns values is transposed into individual columns with distinct data.

How do you transpose columns to rows in PySpark DataFrame?

Spark pivot() function is used to pivot/rotate the data from one DataFrame/Dataset column into multiple columns (transform row to column) and unpivot is used to transform it back (transform columns to rows).

What is selectExpr in PySpark?

DataFrame. selectExpr (*expr)[source] Projects a set of SQL expressions and returns a new DataFrame . This is a variant of select() that accepts SQL expressions.

How do I speed up my Spark pivot?

Speeding-up the Pivot function Fortunately, we can speed it up by passing a list of values as the second parameter of the pivot function. Note that if the list does not contain all the pivoted column values, we will lose some data because Spark drops the rows that don't match any of the given parameters.


1 Answers

Unfortunately there is no case when:

  • Spark DataFrame is justified considering amount of data.
  • Transposition of data is feasible.

You have to remember that DataFrame, as implemented in Spark, is a distributed collection of rows and each row is stored and processed on a single node.

You could express transposition on a DataFrame as pivot:

val kv = explode(array(df.columns.tail.map { 
  c => struct(lit(c).alias("k"), col(c).alias("v")) 
}: _*))

df
  .withColumn("kv", kv)
  .select($"segment_id", $"kv.k", $"kv.v")
  .groupBy($"k")
  .pivot("segment_id")
  .agg(first($"v"))
  .orderBy($"k")
  .withColumnRenamed("k", "vals")

but it is merely a toy code with no practical applications. In practice it is not better than collecting data:

val (header, data) = df.collect.map(_.toSeq.toArray).transpose match {
  case Array(h, t @ _*) => {
    (h.map(_.toString), t.map(_.collect { case x: Int => x }))
  }
}

val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
val schema = StructType(
  StructField("vals", StringType) +: header.map(StructField(_, IntegerType))
)

spark.createDataFrame(sc.parallelize(rows), schema)

For DataFrame defined as:

val df = Seq(
  (1, 100, 0, 0, 0, 0, 0),
  (2, 0, 50, 0, 0, 20, 0),
  (3, 0, 0, 0, 0, 0, 0),
  (4, 0, 0, 0, 0, 0, 0)
).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")

both would you give you the desired result:

+----+---+---+---+---+
|vals|  1|  2|  3|  4|
+----+---+---+---+---+
|val1|100|  0|  0|  0|
|val2|  0| 50|  0|  0|
|val3|  0|  0|  0|  0|
|val4|  0|  0|  0|  0|
|val5|  0| 20|  0|  0|
|val6|  0|  0|  0|  0|
+----+---+---+---+---+

That being said if you need an efficient transpositions on distributed data structure you'll have to look somewhere else. There is a number of structures, including core CoordinateMatrix and BlockMatrix, which can distribute data across both dimensions and can be transposed.

like image 84
zero323 Avatar answered Oct 06 '22 23:10

zero323